Validate diskFrontier domain for series candidate.

It is the case with the benchmark tool that we thought that we
generated multiple series and saved them to the disk as such, when
in reality, we overwrote the fields of the outgoing metrics via
Go map reference behavior.  This was accidental.  In the course of
diagnosing this, a few errors were found:

1. ``newSeriesFrontier`` should check to see if the candidate fingerprint is within the given domain of the ``diskFrontier``.  If not, as the contract in the docstring stipulates, a ``nil`` ``seriesFrontier`` should be emitted.

2. In the interests of aiding debugging, the raw LevelDB ``levigoIterator`` type now includes a helpful forensics ``String()`` method.

This work produced additional cleanups:

1. ``Close() error`` with the storage stack is technically incorrect, since nowhere in the bowels of it does an error actually occur.  The interface has been simplified to remove this for now.
This commit is contained in:
Matt T. Proud 2013-04-01 13:22:38 +02:00
parent e254c0bc33
commit a55602df4a
15 changed files with 118 additions and 153 deletions

View file

@ -35,11 +35,11 @@ type diskFrontier struct {
lastSupertime time.Time
}
func (f *diskFrontier) String() string {
func (f diskFrontier) String() string {
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime)
}
func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}
@ -48,12 +48,15 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
if !i.SeekToLast() || i.Key() == nil {
return
}
lastKey, err := extractSampleKey(i)
if err != nil {
panic(err)
}
i.SeekToFirst()
if !i.SeekToFirst() || i.Key() == nil {
return
}
firstKey, err := extractSampleKey(i)
if i.Key() == nil {
return
@ -92,6 +95,13 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
upperSeek = lastSupertime
)
// If the diskFrontier for this iterator says that the candidate fingerprint
// is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail.
if !d.ContainsFingerprint(f) {
return
}
// If we are either the first or the last key in the database, we need to use
// pessimistic boundary frontiers.
if f.Equal(d.firstFingerprint) {

View file

@ -24,7 +24,7 @@ import (
type MetricPersistence interface {
// A storage system may rely on external resources and thusly should be
// closed when finished.
Close() error
Close()
// Commit all pending operations, if any, since some of the storage components
// queue work on channels and operate on it in bulk.

View file

@ -24,7 +24,6 @@ import (
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
leveldb "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
"io"
"log"
"sort"
"sync"
@ -58,68 +57,31 @@ var (
)
type leveldbOpener func()
type leveldbCloser interface {
Close()
}
func (l *LevelDBMetricPersistence) Close() error {
var persistences = []struct {
name string
closer io.Closer
}{
{
"Fingerprint to Label Name and Value Pairs",
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
l.fingerprintToMetrics,
},
{
"Fingerprint High Watermarks",
l.metricHighWatermarks,
},
{
"Fingerprint Samples",
l.metricSamples,
},
{
"Label Name to Fingerprints",
l.labelNameToFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelSetToFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
}
errorChannel := make(chan error, len(persistences))
closerGroup := sync.WaitGroup{}
for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer
go func(name string, closer io.Closer) {
if closer != nil {
closingError := closer.Close()
if closingError != nil {
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError)
for _, closer := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
closer.Close()
closerGroup.Done()
}(closer)
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
}(name, closer)
}
for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
if closingError != nil {
return closingError
}
}
return nil
closerGroup.Wait()
}
func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {

View file

@ -327,7 +327,7 @@ func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interv
return
}
func (s memorySeriesStorage) Close() (err error) {
func (s memorySeriesStorage) Close() {
// This can probably be simplified:
//
// s.fingerPrintToSeries = map[model.Fingerprint]*stream{}
@ -344,8 +344,6 @@ func (s memorySeriesStorage) Close() (err error) {
for labelName := range s.labelNameToFingerprints {
delete(s.labelNameToFingerprints, labelName)
}
return
}
func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {

View file

@ -552,13 +552,8 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",
@ -994,13 +989,8 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",
@ -1348,13 +1338,8 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()
defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer p.Close()
m := model.Metric{
model.MetricNameLabel: "age_in_years",

View file

@ -189,12 +189,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
stochastic := func(x int) (success bool) {
p, closer := persistenceMaker()
defer closer.Close()
defer func() {
err := p.Close()
if err != nil {
t.Error(err)
}
}()
defer p.Close()
seed := rand.NewSource(int64(x))
random := rand.New(seed)

View file

@ -56,12 +56,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
}
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()
f(p, t)
}
@ -72,12 +67,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func
p := NewMemorySeriesStorage()
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()
f(p, t)
}

View file

@ -145,7 +145,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return
}
func (t *tieredStorage) rebuildDiskFrontier() (err error) {
func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
@ -153,9 +153,6 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) {
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
i := t.diskStorage.metricSamples.NewIterator(true)
defer i.Close()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
@ -298,8 +295,6 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe
flusher: f,
}
// fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey())
return visitor, visitor, visitor
}
@ -309,11 +304,7 @@ func (f *memoryToDiskFlusher) Flush() {
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue)
}
start := time.Now()
f.disk.AppendSamples(samples)
if false {
fmt.Printf("Took %s to append...\n", time.Since(start))
}
}
func (f memoryToDiskFlusher) Close() {
@ -360,11 +351,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.diskStorage.metricSamples.NewIterator(true)
)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier()
err = t.rebuildDiskFrontier(iterator)
if err != nil {
panic(err)
}
@ -374,10 +368,6 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
return
}
// Get a single iterator that will be used for all data extraction below.
iterator := t.diskStorage.metricSamples.NewIterator(true)
defer iterator.Close()
for _, scanJob := range scans {
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {

View file

@ -21,5 +21,5 @@ type MembershipIndex interface {
Has(key coding.Encoder) (bool, error)
Put(key coding.Encoder) error
Drop(key coding.Encoder) error
Close() error
Close()
}

View file

@ -28,8 +28,8 @@ type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
}
func (l *LevelDBMembershipIndex) Close() error {
return l.persistence.Close()
func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close()
}
func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) {

View file

@ -16,14 +16,14 @@ package raw
import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"io"
)
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
io.Closer
// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
// Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if
@ -50,8 +50,9 @@ type Persistence interface {
// en masse. The interface implies no protocol around the atomicity of
// effectuation.
type Batch interface {
io.Closer
// Close reaps all of the underlying system resources associated with this
// batch mutation.
Close()
// Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder)
// Drop follows the same protocol as Persistence.Drop.

View file

@ -50,8 +50,6 @@ func (b batch) Put(key, value coding.Encoder) {
b.batch.Put(keyEncoded, valueEncoded)
}
func (b batch) Close() (err error) {
func (b batch) Close() {
b.batch.Close()
return
}

View file

@ -38,4 +38,5 @@ type Iterator interface {
SeekToFirst() (ok bool)
SeekToLast() (ok bool)
Value() []byte
Close()
}

View file

@ -15,10 +15,12 @@ package leveldb
import (
"flag"
"fmt"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"time"
)
var (
@ -57,9 +59,34 @@ type levigoIterator struct {
storage *levigo.DB
// closed indicates whether the iterator has been closed before.
closed bool
// valid indicates whether the iterator may be used. If a LevelDB iterator
// ever becomes invalid, it must be disposed of and cannot be reused.
valid bool
// creationTime provides the time at which the iterator was made.
creationTime time.Time
}
func (i *levigoIterator) Close() (err error) {
func (i levigoIterator) String() string {
var (
valid = "valid"
open = "open"
snapshotted = "snapshotted"
)
if i.closed {
open = "closed"
}
if !i.valid {
valid = "invalid"
}
if i.snapshot == nil {
snapshotted = "unsnapshotted"
}
return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted)
}
func (i *levigoIterator) Close() {
if i.closed {
return
}
@ -81,38 +108,49 @@ func (i *levigoIterator) Close() (err error) {
i.storage = nil
i.closed = true
i.valid = false
return
}
func (i levigoIterator) Seek(key []byte) (ok bool) {
func (i *levigoIterator) Seek(key []byte) bool {
i.iterator.Seek(key)
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) SeekToFirst() (ok bool) {
func (i *levigoIterator) SeekToFirst() bool {
i.iterator.SeekToFirst()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) SeekToLast() (ok bool) {
func (i *levigoIterator) SeekToLast() bool {
i.iterator.SeekToLast()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Next() (ok bool) {
func (i *levigoIterator) Next() bool {
i.iterator.Next()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Previous() (ok bool) {
func (i *levigoIterator) Previous() bool {
i.iterator.Prev()
return i.iterator.Valid()
i.valid = i.iterator.Valid()
return i.valid
}
func (i levigoIterator) Key() (key []byte) {
@ -166,7 +204,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
return
}
func (l *LevelDBPersistence) Close() (err error) {
func (l *LevelDBPersistence) Close() {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func() {
@ -283,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// will be leaked.
//
// The iterator is optionally snapshotable.
func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
var (
snapshot *levigo.Snapshot
readOptions *levigo.ReadOptions
@ -299,7 +337,8 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
iterator = l.storage.NewIterator(l.readOptions)
}
return levigoIterator{
return &levigoIterator{
creationTime: time.Now(),
iterator: iterator,
readOptions: readOptions,
snapshot: snapshot,

View file

@ -68,12 +68,8 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
defer t.Close()
p.tester.Fatal(err)
}
defer func() {
err = persistence.Close()
if err != nil {
p.tester.Fatal(err)
}
}()
defer persistence.Close()
for f.HasNext() {
key, value := f.Next()