Conditionalize disk initializations.

This commit conditionalizes the creation of the diskFrontier and
seriesFrontier along with the iterator such that they are provisioned
once something is actually required from disk.
This commit is contained in:
Matt T. Proud 2013-05-22 19:06:06 +02:00 committed by Matt T. Proud
parent 97736a030a
commit fe41ce0b19
4 changed files with 66 additions and 69 deletions

View file

@ -81,7 +81,7 @@ type watermarkOperator struct {
curationState raw.Persistence curationState raw.Persistence
// diskFrontier models the available seekable ranges for the provided // diskFrontier models the available seekable ranges for the provided
// sampleIterator. // sampleIterator.
diskFrontier diskFrontier diskFrontier *diskFrontier
// ignoreYoungerThan is passed into the curation remark for the given series. // ignoreYoungerThan is passed into the curation remark for the given series.
ignoreYoungerThan time.Duration ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the // processor is responsible for executing a given stategy on the
@ -128,11 +128,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
iterator := samples.NewIterator(true) iterator := samples.NewIterator(true)
defer iterator.Close() defer iterator.Close()
diskFrontier, err := newDiskFrontier(iterator) diskFrontier, present, err := newDiskFrontier(iterator)
if err != nil { if err != nil {
return return
} }
if diskFrontier == nil { if !present {
// No sample database exists; no work to do! // No sample database exists; no work to do!
return return
} }
@ -153,7 +153,7 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
// begun for a given series. // begun for a given series.
operator := watermarkOperator{ operator := watermarkOperator{
curationState: curationState, curationState: curationState,
diskFrontier: *diskFrontier, diskFrontier: diskFrontier,
processor: processor, processor: processor,
ignoreYoungerThan: ignoreYoungerThan, ignoreYoungerThan: ignoreYoungerThan,
sampleIterator: iterator, sampleIterator: iterator,
@ -319,8 +319,8 @@ func (w watermarkFilter) curationConsistent(f *model.Fingerprint, watermark mode
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) { func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(*model.Fingerprint) fingerprint := key.(*model.Fingerprint)
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
if err != nil || seriesFrontier == nil { if err != nil || !present {
// An anomaly with the series frontier is severe in the sense that some sort // An anomaly with the series frontier is severe in the sense that some sort
// of an illegal state condition exists in the storage layer, which would // of an illegal state condition exists in the storage layer, which would
// probably signify an illegal disk frontier. // probably signify an illegal disk frontier.

View file

@ -43,26 +43,21 @@ func (f diskFrontier) ContainsFingerprint(fingerprint *model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
} }
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) {
if !i.SeekToLast() || i.Key() == nil { if !i.SeekToLast() || i.Key() == nil {
return return nil, false, nil
} }
lastKey, err := extractSampleKey(i) lastKey, err := extractSampleKey(i)
if err != nil { if err != nil {
panic(fmt.Sprintln(err, i.Key(), i.Value())) return nil, false, err
} }
if !i.SeekToFirst() || i.Key() == nil { if !i.SeekToFirst() || i.Key() == nil {
return return nil, false, nil
} }
firstKey, err := extractSampleKey(i) firstKey, err := extractSampleKey(i)
if i.Key() == nil {
return
}
if err != nil { if err != nil {
panic(err) return nil, false, err
} }
d = &diskFrontier{} d = &diskFrontier{}
@ -72,7 +67,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
d.lastFingerprint = lastKey.Fingerprint d.lastFingerprint = lastKey.Fingerprint
d.lastSupertime = lastKey.FirstTimestamp d.lastSupertime = lastKey.FirstTimestamp
return return d, true, nil
} }
// seriesFrontier represents the valid seek frontier for a given series. // seriesFrontier represents the valid seek frontier for a given series.
@ -87,9 +82,8 @@ func (f seriesFrontier) String() string {
} }
// newSeriesFrontier furnishes a populated diskFrontier for a given // newSeriesFrontier furnishes a populated diskFrontier for a given
// fingerprint. A nil diskFrontier will be returned if the series cannot // fingerprint. If the series is absent, present will be false.
// be found in the store. func newSeriesFrontier(f *model.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) {
func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
lowerSeek := firstSupertime lowerSeek := firstSupertime
upperSeek := lastSupertime upperSeek := lastSupertime
@ -97,7 +91,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
// is outside of its seeking domain, there is no way that a seriesFrontier // is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail. // could be materialized. Simply bail.
if !d.ContainsFingerprint(f) { if !d.ContainsFingerprint(f) {
return return nil, false, nil
} }
// If we are either the first or the last key in the database, we need to use // If we are either the first or the last key in the database, we need to use
@ -119,7 +113,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
i.Seek(raw) i.Seek(raw)
if i.Key() == nil { if i.Key() == nil {
return return nil, false, fmt.Errorf("illegal condition: empty key")
} }
retrievedKey, err := extractSampleKey(i) retrievedKey, err := extractSampleKey(i)
@ -146,7 +140,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
// If the previous key does not match, we know that the requested // If the previous key does not match, we know that the requested
// fingerprint does not live in the database. // fingerprint does not live in the database.
if !retrievedFingerprint.Equal(f) { if !retrievedFingerprint.Equal(f) {
return return nil, false, nil
} }
} }
@ -170,7 +164,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
s.firstSupertime = retrievedKey.FirstTimestamp s.firstSupertime = retrievedKey.FirstTimestamp
return return s, true, nil
} }
// Contains indicates whether a given time value is within the recorded // Contains indicates whether a given time value is within the recorded

View file

@ -86,7 +86,7 @@ func (t testTieredStorageCloser) Close() {
func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) { func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t) directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path()) storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0, directory.Path())
if err != nil { if err != nil {
if storage != nil { if storage != nil {

View file

@ -15,15 +15,17 @@ package metric
import ( import (
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"log" "log"
"sort" "sort"
"sync" "sync"
"time" "time"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw/leveldb"
) )
type chunk model.Values type chunk model.Values
@ -58,8 +60,6 @@ type TieredStorage struct {
appendToDiskQueue chan model.Samples appendToDiskQueue chan model.Samples
diskFrontier *diskFrontier
memoryArena *memorySeriesStorage memoryArena *memorySeriesStorage
memoryTTL time.Duration memoryTTL time.Duration
flushMemoryInterval time.Duration flushMemoryInterval time.Duration
@ -151,21 +151,6 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
} }
} }
func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
return
}
return
}
// Starts serving requests. // Starts serving requests.
func (t *TieredStorage) Serve() { func (t *TieredStorage) Serve() {
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
@ -277,25 +262,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
scans := viewJob.builder.ScanJobs() scans := viewJob.builder.ScanJobs()
view := newView() view := newView()
// Get a single iterator that will be used for all data extraction below.
iterator := t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a var iterator leveldb.Iterator = nil
// (fingerprint, timestamp) tuple is outside of the current frontier. var diskFrontier *diskFrontier = nil
err = t.rebuildDiskFrontier(iterator) var seriesPresent = true
if err != nil { var diskPresent = true
panic(err)
}
for _, scanJob := range scans { for _, scanJob := range scans {
var seriesFrontier *seriesFrontier = nil var seriesFrontier *seriesFrontier = nil
if t.diskFrontier != nil {
seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {
panic(err)
}
}
standingOps := scanJob.operations standingOps := scanJob.operations
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint) memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
@ -311,15 +285,44 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk := chunk{} currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk. // If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) // Conditionalize disk access.
if diskFrontier == nil && diskPresent {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
}
// If we aimed past the newest value on disk, combine it with the next value from memory. diskFrontier, diskPresent, err = newDiskFrontier(iterator)
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { if err != nil {
latestDiskValue := diskValues[len(diskValues)-1:] panic(err)
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) }
if !diskPresent {
seriesPresent = false
}
}
if seriesFrontier == nil && diskPresent {
seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator)
if err != nil {
panic(err)
}
}
if diskPresent && seriesPresent {
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
} else { } else {
currentChunk = chunk(diskValues) currentChunk = chunk(memValues)
} }
} else { } else {
currentChunk = chunk(memValues) currentChunk = chunk(memValues)