diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index ecc5c9321..dcaac0949 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -35,11 +35,11 @@ type diskFrontier struct { lastSupertime time.Time } -func (f *diskFrontier) String() string { +func (f diskFrontier) String() string { return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime) } -func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { +func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) } @@ -48,12 +48,15 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { if !i.SeekToLast() || i.Key() == nil { return } + lastKey, err := extractSampleKey(i) if err != nil { panic(err) } - i.SeekToFirst() + if !i.SeekToFirst() || i.Key() == nil { + return + } firstKey, err := extractSampleKey(i) if i.Key() == nil { return @@ -92,6 +95,13 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) upperSeek = lastSupertime ) + // If the diskFrontier for this iterator says that the candidate fingerprint + // is outside of its seeking domain, there is no way that a seriesFrontier + // could be materialized. Simply bail. + if !d.ContainsFingerprint(f) { + return + } + // If we are either the first or the last key in the database, we need to use // pessimistic boundary frontiers. if f.Equal(d.firstFingerprint) { diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 8188bac5e..27358c657 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -24,7 +24,7 @@ import ( type MetricPersistence interface { // A storage system may rely on external resources and thusly should be // closed when finished. - Close() error + Close() // Commit all pending operations, if any, since some of the storage components // queue work on channels and operate on it in bulk. diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d1d59f98d..fef8e4e89 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -24,7 +24,6 @@ import ( index "github.com/prometheus/prometheus/storage/raw/index/leveldb" leveldb "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" - "io" "log" "sort" "sync" @@ -58,68 +57,31 @@ var ( ) type leveldbOpener func() +type leveldbCloser interface { + Close() +} -func (l *LevelDBMetricPersistence) Close() error { - var persistences = []struct { - name string - closer io.Closer - }{ - { - "Fingerprint to Label Name and Value Pairs", - l.fingerprintToMetrics, - }, - { - "Fingerprint High Watermarks", - l.metricHighWatermarks, - }, - { - "Fingerprint Samples", - l.metricSamples, - }, - { - "Label Name to Fingerprints", - l.labelNameToFingerprints, - }, - { - "Label Name and Value Pairs to Fingerprints", - l.labelSetToFingerprints, - }, - { - "Metric Membership Index", - l.metricMembershipIndex, - }, +func (l *LevelDBMetricPersistence) Close() { + var persistences = []leveldbCloser{ + l.fingerprintToMetrics, + l.metricHighWatermarks, + l.metricSamples, + l.labelNameToFingerprints, + l.labelSetToFingerprints, + l.metricMembershipIndex, } - errorChannel := make(chan error, len(persistences)) + closerGroup := sync.WaitGroup{} - for _, persistence := range persistences { - name := persistence.name - closer := persistence.closer - - go func(name string, closer io.Closer) { - if closer != nil { - closingError := closer.Close() - - if closingError != nil { - log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError) - } - - errorChannel <- closingError - } else { - errorChannel <- nil - } - }(name, closer) + for _, closer := range persistences { + closerGroup.Add(1) + go func(closer leveldbCloser) { + closer.Close() + closerGroup.Done() + }(closer) } - for i := 0; i < cap(errorChannel); i++ { - closingError := <-errorChannel - - if closingError != nil { - return closingError - } - } - - return nil + closerGroup.Wait() } func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) { diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 7d4c2fb1c..b62263de3 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -327,7 +327,7 @@ func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interv return } -func (s memorySeriesStorage) Close() (err error) { +func (s memorySeriesStorage) Close() { // This can probably be simplified: // // s.fingerPrintToSeries = map[model.Fingerprint]*stream{} @@ -344,8 +344,6 @@ func (s memorySeriesStorage) Close() (err error) { for labelName := range s.labelNameToFingerprints { delete(s.labelNameToFingerprints, labelName) } - - return } func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 21138a89f..65dcec2ea 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -552,13 +552,8 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", @@ -994,13 +989,8 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", @@ -1348,13 +1338,8 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 10e0c5450..6045da867 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -189,12 +189,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t stochastic := func(x int) (success bool) { p, closer := persistenceMaker() defer closer.Close() - defer func() { - err := p.Close() - if err != nil { - t.Error(err) - } - }() + defer p.Close() seed := rand.NewSource(int64(x)) random := rand.New(seed) diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index c4c08a668..2a1cd74fc 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -56,12 +56,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err) } - defer func() { - err := p.Close() - if err != nil { - t.Errorf("Anomaly while closing database: %q\n", err) - } - }() + defer p.Close() f(p, t) } @@ -72,12 +67,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func p := NewMemorySeriesStorage() - defer func() { - err := p.Close() - if err != nil { - t.Errorf("Anomaly while closing database: %q\n", err) - } - }() + defer p.Close() f(p, t) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 81d0fee67..2ba080e68 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -145,7 +145,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat return } -func (t *tieredStorage) rebuildDiskFrontier() (err error) { +func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { begin := time.Now() defer func() { duration := time.Since(begin) @@ -153,9 +153,6 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) { recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) }() - i := t.diskStorage.metricSamples.NewIterator(true) - defer i.Close() - t.diskFrontier, err = newDiskFrontier(i) if err != nil { panic(err) @@ -298,8 +295,6 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe flusher: f, } - // fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey()) - return visitor, visitor, visitor } @@ -309,11 +304,7 @@ func (f *memoryToDiskFlusher) Flush() { for i := 0; i < length; i++ { samples = append(samples, <-f.toDiskQueue) } - start := time.Now() f.disk.AppendSamples(samples) - if false { - fmt.Printf("Took %s to append...\n", time.Since(start)) - } } func (f memoryToDiskFlusher) Close() { @@ -360,11 +351,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) { var ( scans = viewJob.builder.ScanJobs() view = newView() + // Get a single iterator that will be used for all data extraction below. + iterator = t.diskStorage.metricSamples.NewIterator(true) ) + defer iterator.Close() // Rebuilding of the frontier should happen on a conditional basis if a // (fingerprint, timestamp) tuple is outside of the current frontier. - err = t.rebuildDiskFrontier() + err = t.rebuildDiskFrontier(iterator) if err != nil { panic(err) } @@ -374,10 +368,6 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } - // Get a single iterator that will be used for all data extraction below. - iterator := t.diskStorage.metricSamples.NewIterator(true) - defer iterator.Close() - for _, scanJob := range scans { seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) if err != nil { diff --git a/storage/raw/index/interface.go b/storage/raw/index/interface.go index 237aab85f..11967fa00 100644 --- a/storage/raw/index/interface.go +++ b/storage/raw/index/interface.go @@ -21,5 +21,5 @@ type MembershipIndex interface { Has(key coding.Encoder) (bool, error) Put(key coding.Encoder) error Drop(key coding.Encoder) error - Close() error + Close() } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index a877a6d87..27303276e 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -28,8 +28,8 @@ type LevelDBMembershipIndex struct { persistence *leveldb.LevelDBPersistence } -func (l *LevelDBMembershipIndex) Close() error { - return l.persistence.Close() +func (l *LevelDBMembershipIndex) Close() { + l.persistence.Close() } func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 51f99b008..42cb049bb 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -16,14 +16,14 @@ package raw import ( "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" - "io" ) // Persistence models a key-value store for bytes that supports various // additional operations. type Persistence interface { - io.Closer - + // Close reaps all of the underlying system resources associated with this + // persistence. + Close() // Has informs the user whether a given key exists in the database. Has(key coding.Encoder) (bool, error) // Get retrieves the key from the database if it exists or returns nil if @@ -50,8 +50,9 @@ type Persistence interface { // en masse. The interface implies no protocol around the atomicity of // effectuation. type Batch interface { - io.Closer - + // Close reaps all of the underlying system resources associated with this + // batch mutation. + Close() // Put follows the same protocol as Persistence.Put. Put(key, value coding.Encoder) // Drop follows the same protocol as Persistence.Drop. diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index 4af34d9f5..aec4ac547 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -50,8 +50,6 @@ func (b batch) Put(key, value coding.Encoder) { b.batch.Put(keyEncoded, valueEncoded) } -func (b batch) Close() (err error) { +func (b batch) Close() { b.batch.Close() - - return } diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go index 4f47198b4..b352eda16 100644 --- a/storage/raw/leveldb/iterator.go +++ b/storage/raw/leveldb/iterator.go @@ -38,4 +38,5 @@ type Iterator interface { SeekToFirst() (ok bool) SeekToLast() (ok bool) Value() []byte + Close() } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 712d0a99f..901913196 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -15,10 +15,12 @@ package leveldb import ( "flag" + "fmt" "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" + "time" ) var ( @@ -57,9 +59,34 @@ type levigoIterator struct { storage *levigo.DB // closed indicates whether the iterator has been closed before. closed bool + // valid indicates whether the iterator may be used. If a LevelDB iterator + // ever becomes invalid, it must be disposed of and cannot be reused. + valid bool + // creationTime provides the time at which the iterator was made. + creationTime time.Time } -func (i *levigoIterator) Close() (err error) { +func (i levigoIterator) String() string { + var ( + valid = "valid" + open = "open" + snapshotted = "snapshotted" + ) + + if i.closed { + open = "closed" + } + if !i.valid { + valid = "invalid" + } + if i.snapshot == nil { + snapshotted = "unsnapshotted" + } + + return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted) +} + +func (i *levigoIterator) Close() { if i.closed { return } @@ -81,38 +108,49 @@ func (i *levigoIterator) Close() (err error) { i.storage = nil i.closed = true + i.valid = false return } -func (i levigoIterator) Seek(key []byte) (ok bool) { +func (i *levigoIterator) Seek(key []byte) bool { i.iterator.Seek(key) - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) SeekToFirst() (ok bool) { +func (i *levigoIterator) SeekToFirst() bool { i.iterator.SeekToFirst() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) SeekToLast() (ok bool) { +func (i *levigoIterator) SeekToLast() bool { i.iterator.SeekToLast() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) Next() (ok bool) { +func (i *levigoIterator) Next() bool { i.iterator.Next() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) Previous() (ok bool) { +func (i *levigoIterator) Previous() bool { i.iterator.Prev() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } func (i levigoIterator) Key() (key []byte) { @@ -166,7 +204,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter return } -func (l *LevelDBPersistence) Close() (err error) { +func (l *LevelDBPersistence) Close() { // These are deferred to take advantage of forced closing in case of stack // unwinding due to anomalies. defer func() { @@ -283,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { // will be leaked. // // The iterator is optionally snapshotable. -func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { +func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator { var ( snapshot *levigo.Snapshot readOptions *levigo.ReadOptions @@ -299,11 +337,12 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { iterator = l.storage.NewIterator(l.readOptions) } - return levigoIterator{ - iterator: iterator, - readOptions: readOptions, - snapshot: snapshot, - storage: l.storage, + return &levigoIterator{ + creationTime: time.Now(), + iterator: iterator, + readOptions: readOptions, + snapshot: snapshot, + storage: l.storage, } } diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 2cb8d35d9..f93aecb08 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -68,12 +68,8 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory defer t.Close() p.tester.Fatal(err) } - defer func() { - err = persistence.Close() - if err != nil { - p.tester.Fatal(err) - } - }() + + defer persistence.Close() for f.HasNext() { key, value := f.Next()