From a55602df4a8c25559798dbcb1870bfa444e13d66 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 1 Apr 2013 13:22:38 +0200 Subject: [PATCH] Validate diskFrontier domain for series candidate. It is the case with the benchmark tool that we thought that we generated multiple series and saved them to the disk as such, when in reality, we overwrote the fields of the outgoing metrics via Go map reference behavior. This was accidental. In the course of diagnosing this, a few errors were found: 1. ``newSeriesFrontier`` should check to see if the candidate fingerprint is within the given domain of the ``diskFrontier``. If not, as the contract in the docstring stipulates, a ``nil`` ``seriesFrontier`` should be emitted. 2. In the interests of aiding debugging, the raw LevelDB ``levigoIterator`` type now includes a helpful forensics ``String()`` method. This work produced additional cleanups: 1. ``Close() error`` with the storage stack is technically incorrect, since nowhere in the bowels of it does an error actually occur. The interface has been simplified to remove this for now. --- storage/metric/frontier.go | 16 +++++- storage/metric/interface.go | 2 +- storage/metric/leveldb.go | 76 +++++++------------------ storage/metric/memory.go | 4 +- storage/metric/rule_integration_test.go | 27 ++------- storage/metric/stochastic_test.go | 7 +-- storage/metric/test_helper.go | 14 +---- storage/metric/tiered.go | 20 ++----- storage/raw/index/interface.go | 2 +- storage/raw/index/leveldb/leveldb.go | 4 +- storage/raw/interface.go | 11 ++-- storage/raw/leveldb/batch.go | 4 +- storage/raw/leveldb/iterator.go | 1 + storage/raw/leveldb/leveldb.go | 75 ++++++++++++++++++------ storage/raw/leveldb/test/fixtures.go | 8 +-- 15 files changed, 118 insertions(+), 153 deletions(-) diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index ecc5c9321..dcaac0949 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -35,11 +35,11 @@ type diskFrontier struct { lastSupertime time.Time } -func (f *diskFrontier) String() string { +func (f diskFrontier) String() string { return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime) } -func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { +func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool { return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) } @@ -48,12 +48,15 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { if !i.SeekToLast() || i.Key() == nil { return } + lastKey, err := extractSampleKey(i) if err != nil { panic(err) } - i.SeekToFirst() + if !i.SeekToFirst() || i.Key() == nil { + return + } firstKey, err := extractSampleKey(i) if i.Key() == nil { return @@ -92,6 +95,13 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) upperSeek = lastSupertime ) + // If the diskFrontier for this iterator says that the candidate fingerprint + // is outside of its seeking domain, there is no way that a seriesFrontier + // could be materialized. Simply bail. + if !d.ContainsFingerprint(f) { + return + } + // If we are either the first or the last key in the database, we need to use // pessimistic boundary frontiers. if f.Equal(d.firstFingerprint) { diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 8188bac5e..27358c657 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -24,7 +24,7 @@ import ( type MetricPersistence interface { // A storage system may rely on external resources and thusly should be // closed when finished. - Close() error + Close() // Commit all pending operations, if any, since some of the storage components // queue work on channels and operate on it in bulk. diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d1d59f98d..fef8e4e89 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -24,7 +24,6 @@ import ( index "github.com/prometheus/prometheus/storage/raw/index/leveldb" leveldb "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" - "io" "log" "sort" "sync" @@ -58,68 +57,31 @@ var ( ) type leveldbOpener func() +type leveldbCloser interface { + Close() +} -func (l *LevelDBMetricPersistence) Close() error { - var persistences = []struct { - name string - closer io.Closer - }{ - { - "Fingerprint to Label Name and Value Pairs", - l.fingerprintToMetrics, - }, - { - "Fingerprint High Watermarks", - l.metricHighWatermarks, - }, - { - "Fingerprint Samples", - l.metricSamples, - }, - { - "Label Name to Fingerprints", - l.labelNameToFingerprints, - }, - { - "Label Name and Value Pairs to Fingerprints", - l.labelSetToFingerprints, - }, - { - "Metric Membership Index", - l.metricMembershipIndex, - }, +func (l *LevelDBMetricPersistence) Close() { + var persistences = []leveldbCloser{ + l.fingerprintToMetrics, + l.metricHighWatermarks, + l.metricSamples, + l.labelNameToFingerprints, + l.labelSetToFingerprints, + l.metricMembershipIndex, } - errorChannel := make(chan error, len(persistences)) + closerGroup := sync.WaitGroup{} - for _, persistence := range persistences { - name := persistence.name - closer := persistence.closer - - go func(name string, closer io.Closer) { - if closer != nil { - closingError := closer.Close() - - if closingError != nil { - log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError) - } - - errorChannel <- closingError - } else { - errorChannel <- nil - } - }(name, closer) + for _, closer := range persistences { + closerGroup.Add(1) + go func(closer leveldbCloser) { + closer.Close() + closerGroup.Done() + }(closer) } - for i := 0; i < cap(errorChannel); i++ { - closingError := <-errorChannel - - if closingError != nil { - return closingError - } - } - - return nil + closerGroup.Wait() } func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) { diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 7d4c2fb1c..b62263de3 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -327,7 +327,7 @@ func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interv return } -func (s memorySeriesStorage) Close() (err error) { +func (s memorySeriesStorage) Close() { // This can probably be simplified: // // s.fingerPrintToSeries = map[model.Fingerprint]*stream{} @@ -344,8 +344,6 @@ func (s memorySeriesStorage) Close() (err error) { for labelName := range s.labelNameToFingerprints { delete(s.labelNameToFingerprints, labelName) } - - return } func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 21138a89f..65dcec2ea 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -552,13 +552,8 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", @@ -994,13 +989,8 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", @@ -1348,13 +1338,8 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer func() { p, closer := persistenceMaker() - defer func() { - defer closer.Close() - err := p.Close() - if err != nil { - t.Fatalf("Encountered anomaly closing persistence: %q\n", err) - } - }() + defer closer.Close() + defer p.Close() m := model.Metric{ model.MetricNameLabel: "age_in_years", diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 10e0c5450..6045da867 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -189,12 +189,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t stochastic := func(x int) (success bool) { p, closer := persistenceMaker() defer closer.Close() - defer func() { - err := p.Close() - if err != nil { - t.Error(err) - } - }() + defer p.Close() seed := rand.NewSource(int64(x)) random := rand.New(seed) diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index c4c08a668..2a1cd74fc 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -56,12 +56,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err) } - defer func() { - err := p.Close() - if err != nil { - t.Errorf("Anomaly while closing database: %q\n", err) - } - }() + defer p.Close() f(p, t) } @@ -72,12 +67,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func p := NewMemorySeriesStorage() - defer func() { - err := p.Close() - if err != nil { - t.Errorf("Anomaly while closing database: %q\n", err) - } - }() + defer p.Close() f(p, t) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 81d0fee67..2ba080e68 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -145,7 +145,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat return } -func (t *tieredStorage) rebuildDiskFrontier() (err error) { +func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { begin := time.Now() defer func() { duration := time.Since(begin) @@ -153,9 +153,6 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) { recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) }() - i := t.diskStorage.metricSamples.NewIterator(true) - defer i.Close() - t.diskFrontier, err = newDiskFrontier(i) if err != nil { panic(err) @@ -298,8 +295,6 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe flusher: f, } - // fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey()) - return visitor, visitor, visitor } @@ -309,11 +304,7 @@ func (f *memoryToDiskFlusher) Flush() { for i := 0; i < length; i++ { samples = append(samples, <-f.toDiskQueue) } - start := time.Now() f.disk.AppendSamples(samples) - if false { - fmt.Printf("Took %s to append...\n", time.Since(start)) - } } func (f memoryToDiskFlusher) Close() { @@ -360,11 +351,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) { var ( scans = viewJob.builder.ScanJobs() view = newView() + // Get a single iterator that will be used for all data extraction below. + iterator = t.diskStorage.metricSamples.NewIterator(true) ) + defer iterator.Close() // Rebuilding of the frontier should happen on a conditional basis if a // (fingerprint, timestamp) tuple is outside of the current frontier. - err = t.rebuildDiskFrontier() + err = t.rebuildDiskFrontier(iterator) if err != nil { panic(err) } @@ -374,10 +368,6 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } - // Get a single iterator that will be used for all data extraction below. - iterator := t.diskStorage.metricSamples.NewIterator(true) - defer iterator.Close() - for _, scanJob := range scans { seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) if err != nil { diff --git a/storage/raw/index/interface.go b/storage/raw/index/interface.go index 237aab85f..11967fa00 100644 --- a/storage/raw/index/interface.go +++ b/storage/raw/index/interface.go @@ -21,5 +21,5 @@ type MembershipIndex interface { Has(key coding.Encoder) (bool, error) Put(key coding.Encoder) error Drop(key coding.Encoder) error - Close() error + Close() } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index a877a6d87..27303276e 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -28,8 +28,8 @@ type LevelDBMembershipIndex struct { persistence *leveldb.LevelDBPersistence } -func (l *LevelDBMembershipIndex) Close() error { - return l.persistence.Close() +func (l *LevelDBMembershipIndex) Close() { + l.persistence.Close() } func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 51f99b008..42cb049bb 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -16,14 +16,14 @@ package raw import ( "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" - "io" ) // Persistence models a key-value store for bytes that supports various // additional operations. type Persistence interface { - io.Closer - + // Close reaps all of the underlying system resources associated with this + // persistence. + Close() // Has informs the user whether a given key exists in the database. Has(key coding.Encoder) (bool, error) // Get retrieves the key from the database if it exists or returns nil if @@ -50,8 +50,9 @@ type Persistence interface { // en masse. The interface implies no protocol around the atomicity of // effectuation. type Batch interface { - io.Closer - + // Close reaps all of the underlying system resources associated with this + // batch mutation. + Close() // Put follows the same protocol as Persistence.Put. Put(key, value coding.Encoder) // Drop follows the same protocol as Persistence.Drop. diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index 4af34d9f5..aec4ac547 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -50,8 +50,6 @@ func (b batch) Put(key, value coding.Encoder) { b.batch.Put(keyEncoded, valueEncoded) } -func (b batch) Close() (err error) { +func (b batch) Close() { b.batch.Close() - - return } diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go index 4f47198b4..b352eda16 100644 --- a/storage/raw/leveldb/iterator.go +++ b/storage/raw/leveldb/iterator.go @@ -38,4 +38,5 @@ type Iterator interface { SeekToFirst() (ok bool) SeekToLast() (ok bool) Value() []byte + Close() } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 712d0a99f..901913196 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -15,10 +15,12 @@ package leveldb import ( "flag" + "fmt" "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" + "time" ) var ( @@ -57,9 +59,34 @@ type levigoIterator struct { storage *levigo.DB // closed indicates whether the iterator has been closed before. closed bool + // valid indicates whether the iterator may be used. If a LevelDB iterator + // ever becomes invalid, it must be disposed of and cannot be reused. + valid bool + // creationTime provides the time at which the iterator was made. + creationTime time.Time } -func (i *levigoIterator) Close() (err error) { +func (i levigoIterator) String() string { + var ( + valid = "valid" + open = "open" + snapshotted = "snapshotted" + ) + + if i.closed { + open = "closed" + } + if !i.valid { + valid = "invalid" + } + if i.snapshot == nil { + snapshotted = "unsnapshotted" + } + + return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted) +} + +func (i *levigoIterator) Close() { if i.closed { return } @@ -81,38 +108,49 @@ func (i *levigoIterator) Close() (err error) { i.storage = nil i.closed = true + i.valid = false return } -func (i levigoIterator) Seek(key []byte) (ok bool) { +func (i *levigoIterator) Seek(key []byte) bool { i.iterator.Seek(key) - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) SeekToFirst() (ok bool) { +func (i *levigoIterator) SeekToFirst() bool { i.iterator.SeekToFirst() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) SeekToLast() (ok bool) { +func (i *levigoIterator) SeekToLast() bool { i.iterator.SeekToLast() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) Next() (ok bool) { +func (i *levigoIterator) Next() bool { i.iterator.Next() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } -func (i levigoIterator) Previous() (ok bool) { +func (i *levigoIterator) Previous() bool { i.iterator.Prev() - return i.iterator.Valid() + i.valid = i.iterator.Valid() + + return i.valid } func (i levigoIterator) Key() (key []byte) { @@ -166,7 +204,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter return } -func (l *LevelDBPersistence) Close() (err error) { +func (l *LevelDBPersistence) Close() { // These are deferred to take advantage of forced closing in case of stack // unwinding due to anomalies. defer func() { @@ -283,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { // will be leaked. // // The iterator is optionally snapshotable. -func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { +func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator { var ( snapshot *levigo.Snapshot readOptions *levigo.ReadOptions @@ -299,11 +337,12 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator { iterator = l.storage.NewIterator(l.readOptions) } - return levigoIterator{ - iterator: iterator, - readOptions: readOptions, - snapshot: snapshot, - storage: l.storage, + return &levigoIterator{ + creationTime: time.Now(), + iterator: iterator, + readOptions: readOptions, + snapshot: snapshot, + storage: l.storage, } } diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 2cb8d35d9..f93aecb08 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -68,12 +68,8 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory defer t.Close() p.tester.Fatal(err) } - defer func() { - err = persistence.Close() - if err != nil { - p.tester.Fatal(err) - } - }() + + defer persistence.Close() for f.HasNext() { key, value := f.Next()