Merge pull request #115 from prometheus/fix/storage/nil-behaviors

Validate diskFrontier domain for series candidate.
This commit is contained in:
Matt T. Proud 2013-04-09 03:08:29 -07:00
commit 6146116e2f
15 changed files with 118 additions and 153 deletions

View file

@ -35,11 +35,11 @@ type diskFrontier struct {
lastSupertime time.Time
}
func (f *diskFrontier) String() string {
func (f diskFrontier) String() string {
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime)
}
func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}
@ -48,12 +48,15 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
if !i.SeekToLast() || i.Key() == nil {
return
}
lastKey, err := extractSampleKey(i)
if err != nil {
panic(err)
}
i.SeekToFirst()
if !i.SeekToFirst() || i.Key() == nil {
return
}
firstKey, err := extractSampleKey(i)
if i.Key() == nil {
return
@ -92,6 +95,13 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
upperSeek = lastSupertime
)
// If the diskFrontier for this iterator says that the candidate fingerprint
// is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail.
if !d.ContainsFingerprint(f) {
return
}
// If we are either the first or the last key in the database, we need to use
// pessimistic boundary frontiers.
if f.Equal(d.firstFingerprint) {

View file

@ -24,7 +24,7 @@ import (
type MetricPersistence interface {
// A storage system may rely on external resources and thusly should be
// closed when finished.
Close() error
Close()
// Commit all pending operations, if any, since some of the storage components
// queue work on channels and operate on it in bulk.

View file

@ -24,7 +24,6 @@ import (
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
leveldb "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
"io"
"log"
"sort"
"sync"
@ -58,68 +57,31 @@ var (
)
type leveldbOpener func()
type leveldbCloser interface {
Close()
}
func (l *LevelDBMetricPersistence) Close() error {
var persistences = []struct {
name string
closer io.Closer
}{
{
"Fingerprint to Label Name and Value Pairs",
l.fingerprintToMetrics,
},
{
"Fingerprint High Watermarks",
l.metricHighWatermarks,
},
{
"Fingerprint Samples",
l.metricSamples,
},
{
"Label Name to Fingerprints",
l.labelNameToFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelSetToFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
l.fingerprintToMetrics,
l.metricHighWatermarks,
l.metricSamples,
l.labelNameToFingerprints,
l.labelSetToFingerprints,
l.metricMembershipIndex,
}
errorChannel := make(chan error, len(persistences))
closerGroup := sync.WaitGroup{}
for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer
go func(name string, closer io.Closer) {
if closer != nil {
closingError := closer.Close()
if closingError != nil {
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError)
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
}(name, closer)
for _, closer := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
closer.Close()
closerGroup.Done()
}(closer)
}
for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
if closingError != nil {
return closingError
}
}
return nil
closerGroup.Wait()
}
func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {

View file

@ -327,7 +327,7 @@ func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interv
return
}
func (s memorySeriesStorage) Close() (err error) {
func (s memorySeriesStorage) Close() {
// This can probably be simplified:
//
// s.fingerPrintToSeries = map[model.Fingerprint]*stream{}
@ -344,8 +344,6 @@ func (s memorySeriesStorage) Close() (err error) {
for labelName := range s.labelNameToFingerprints {
delete(s.labelNameToFingerprints, labelName)
}
return
}
func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {

View file

@ -552,13 +552,8 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",
@ -994,13 +989,8 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",
@ -1348,13 +1338,8 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",

View file

@ -189,12 +189,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
stochastic := func(x int) (success bool) {
p, closer := persistenceMaker()
defer closer.Close()
defer func() {
err := p.Close()
if err != nil {
t.Error(err)
}
}()
defer p.Close()
seed := rand.NewSource(int64(x))
random := rand.New(seed)

View file

@ -56,12 +56,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
}
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()
f(p, t)
}
@ -72,12 +67,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func
p := NewMemorySeriesStorage()
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()
f(p, t)
}

View file

@ -145,7 +145,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return
}
func (t *tieredStorage) rebuildDiskFrontier() (err error) {
func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
@ -153,9 +153,6 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) {
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
i := t.diskStorage.metricSamples.NewIterator(true)
defer i.Close()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
@ -298,8 +295,6 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe
flusher: f,
}
// fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey())
return visitor, visitor, visitor
}
@ -309,11 +304,7 @@ func (f *memoryToDiskFlusher) Flush() {
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue)
}
start := time.Now()
f.disk.AppendSamples(samples)
if false {
fmt.Printf("Took %s to append...\n", time.Since(start))
}
}
func (f memoryToDiskFlusher) Close() {
@ -360,11 +351,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.diskStorage.metricSamples.NewIterator(true)
)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier()
err = t.rebuildDiskFrontier(iterator)
if err != nil {
panic(err)
}
@ -374,10 +368,6 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
return
}
// Get a single iterator that will be used for all data extraction below.
iterator := t.diskStorage.metricSamples.NewIterator(true)
defer iterator.Close()
for _, scanJob := range scans {
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {

View file

@ -21,5 +21,5 @@ type MembershipIndex interface {
Has(key coding.Encoder) (bool, error)
Put(key coding.Encoder) error
Drop(key coding.Encoder) error
Close() error
Close()
}

View file

@ -28,8 +28,8 @@ type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
}
func (l *LevelDBMembershipIndex) Close() error {
return l.persistence.Close()
func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close()
}
func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) {

View file

@ -16,14 +16,14 @@ package raw
import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"io"
)
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
io.Closer
// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
// Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if
@ -50,8 +50,9 @@ type Persistence interface {
// en masse. The interface implies no protocol around the atomicity of
// effectuation.
type Batch interface {
io.Closer
// Close reaps all of the underlying system resources associated with this
// batch mutation.
Close()
// Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder)
// Drop follows the same protocol as Persistence.Drop.

View file

@ -50,8 +50,6 @@ func (b batch) Put(key, value coding.Encoder) {
b.batch.Put(keyEncoded, valueEncoded)
}
func (b batch) Close() (err error) {
func (b batch) Close() {
b.batch.Close()
return
}

View file

@ -38,4 +38,5 @@ type Iterator interface {
SeekToFirst() (ok bool)
SeekToLast() (ok bool)
Value() []byte
Close()
}

View file

@ -15,10 +15,12 @@ package leveldb
import (
"flag"
"fmt"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"time"
)
var (
@ -57,9 +59,34 @@ type levigoIterator struct {
storage *levigo.DB
// closed indicates whether the iterator has been closed before.
closed bool
// valid indicates whether the iterator may be used. If a LevelDB iterator
// ever becomes invalid, it must be disposed of and cannot be reused.
valid bool
// creationTime provides the time at which the iterator was made.
creationTime time.Time
}
func (i *levigoIterator) Close() (err error) {
func (i levigoIterator) String() string {
var (
valid = "valid"
open = "open"
snapshotted = "snapshotted"
)
if i.closed {
open = "closed"
}
if !i.valid {
valid = "invalid"
}
if i.snapshot == nil {
snapshotted = "unsnapshotted"
}
return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted)
}
func (i *levigoIterator) Close() {
if i.closed {
return
}
@ -81,38 +108,49 @@ func (i *levigoIterator) Close() (err error) {
i.storage = nil
i.closed = true
i.valid = false
return
}
func (i levigoIterator) Seek(key []byte) (ok bool) {
func (i *levigoIterator) Seek(key []byte) bool {
i.iterator.Seek(key)
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) SeekToFirst() (ok bool) {
func (i *levigoIterator) SeekToFirst() bool {
i.iterator.SeekToFirst()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) SeekToLast() (ok bool) {
func (i *levigoIterator) SeekToLast() bool {
i.iterator.SeekToLast()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Next() (ok bool) {
func (i *levigoIterator) Next() bool {
i.iterator.Next()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Previous() (ok bool) {
func (i *levigoIterator) Previous() bool {
i.iterator.Prev()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Key() (key []byte) {
@ -166,7 +204,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
return
}
func (l *LevelDBPersistence) Close() (err error) {
func (l *LevelDBPersistence) Close() {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func() {
@ -283,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// will be leaked.
//
// The iterator is optionally snapshotable.
func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
var (
snapshot *levigo.Snapshot
readOptions *levigo.ReadOptions
@ -299,11 +337,12 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
iterator = l.storage.NewIterator(l.readOptions)
}
return levigoIterator{
iterator: iterator,
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
return &levigoIterator{
creationTime: time.Now(),
iterator: iterator,
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
}
}

View file

@ -68,12 +68,8 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
defer t.Close()
p.tester.Fatal(err)
}
defer func() {
err = persistence.Close()
if err != nil {
p.tester.Fatal(err)
}
}()
defer persistence.Close()
for f.HasNext() {
key, value := f.Next()