Merge pull request #269 from prometheus/feature/view-tracing

Conditionalize disk initializations.
This commit is contained in:
juliusv 2013-06-04 04:25:16 -07:00
commit 6c36beb764
4 changed files with 66 additions and 69 deletions

View file

@ -81,7 +81,7 @@ type watermarkOperator struct {
curationState raw.Persistence
// diskFrontier models the available seekable ranges for the provided
// sampleIterator.
diskFrontier diskFrontier
diskFrontier *diskFrontier
// ignoreYoungerThan is passed into the curation remark for the given series.
ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the
@ -128,11 +128,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
iterator := samples.NewIterator(true)
defer iterator.Close()
diskFrontier, err := newDiskFrontier(iterator)
diskFrontier, present, err := newDiskFrontier(iterator)
if err != nil {
return
}
if diskFrontier == nil {
if !present {
// No sample database exists; no work to do!
return
}
@ -153,7 +153,7 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
// begun for a given series.
operator := watermarkOperator{
curationState: curationState,
diskFrontier: *diskFrontier,
diskFrontier: diskFrontier,
processor: processor,
ignoreYoungerThan: ignoreYoungerThan,
sampleIterator: iterator,
@ -319,8 +319,8 @@ func (w watermarkFilter) curationConsistent(f *model.Fingerprint, watermark mode
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(*model.Fingerprint)
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
if err != nil || seriesFrontier == nil {
seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
if err != nil || !present {
// An anomaly with the series frontier is severe in the sense that some sort
// of an illegal state condition exists in the storage layer, which would
// probably signify an illegal disk frontier.

View file

@ -43,26 +43,21 @@ func (f diskFrontier) ContainsFingerprint(fingerprint *model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) {
if !i.SeekToLast() || i.Key() == nil {
return
return nil, false, nil
}
lastKey, err := extractSampleKey(i)
if err != nil {
panic(fmt.Sprintln(err, i.Key(), i.Value()))
return nil, false, err
}
if !i.SeekToFirst() || i.Key() == nil {
return
return nil, false, nil
}
firstKey, err := extractSampleKey(i)
if i.Key() == nil {
return
}
if err != nil {
panic(err)
return nil, false, err
}
d = &diskFrontier{}
@ -72,7 +67,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
d.lastFingerprint = lastKey.Fingerprint
d.lastSupertime = lastKey.FirstTimestamp
return
return d, true, nil
}
// seriesFrontier represents the valid seek frontier for a given series.
@ -87,9 +82,8 @@ func (f seriesFrontier) String() string {
}
// newSeriesFrontier furnishes a populated diskFrontier for a given
// fingerprint. A nil diskFrontier will be returned if the series cannot
// be found in the store.
func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
// fingerprint. If the series is absent, present will be false.
func newSeriesFrontier(f *model.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) {
lowerSeek := firstSupertime
upperSeek := lastSupertime
@ -97,7 +91,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
// is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail.
if !d.ContainsFingerprint(f) {
return
return nil, false, nil
}
// If we are either the first or the last key in the database, we need to use
@ -119,7 +113,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
i.Seek(raw)
if i.Key() == nil {
return
return nil, false, fmt.Errorf("illegal condition: empty key")
}
retrievedKey, err := extractSampleKey(i)
@ -146,7 +140,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
// If the previous key does not match, we know that the requested
// fingerprint does not live in the database.
if !retrievedFingerprint.Equal(f) {
return
return nil, false, nil
}
}
@ -170,7 +164,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator)
s.firstSupertime = retrievedKey.FirstTimestamp
return
return s, true, nil
}
// Contains indicates whether a given time value is within the recorded

View file

@ -86,7 +86,7 @@ func (t testTieredStorageCloser) Close() {
func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path())
storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0, directory.Path())
if err != nil {
if storage != nil {

View file

@ -15,15 +15,17 @@ package metric
import (
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"log"
"sort"
"sync"
"time"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
type chunk model.Values
@ -58,8 +60,6 @@ type TieredStorage struct {
appendToDiskQueue chan model.Samples
diskFrontier *diskFrontier
memoryArena *memorySeriesStorage
memoryTTL time.Duration
flushMemoryInterval time.Duration
@ -151,21 +151,6 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
}
}
func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
return
}
return
}
// Starts serving requests.
func (t *TieredStorage) Serve() {
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
@ -277,25 +262,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
scans := viewJob.builder.ScanJobs()
view := newView()
// Get a single iterator that will be used for all data extraction below.
iterator := t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier(iterator)
if err != nil {
panic(err)
}
var iterator leveldb.Iterator = nil
var diskFrontier *diskFrontier = nil
var diskPresent = true
for _, scanJob := range scans {
var seriesFrontier *seriesFrontier = nil
if t.diskFrontier != nil {
seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {
panic(err)
}
}
var seriesPresent = true
standingOps := scanJob.operations
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
@ -311,15 +285,44 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
// Conditionalize disk access.
if diskFrontier == nil && diskPresent {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
}
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
diskFrontier, diskPresent, err = newDiskFrontier(iterator)
if err != nil {
panic(err)
}
if !diskPresent {
seriesPresent = false
}
}
if seriesFrontier == nil && diskPresent {
seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator)
if err != nil {
panic(err)
}
}
if diskPresent && seriesPresent {
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
} else {
currentChunk = chunk(diskValues)
currentChunk = chunk(memValues)
}
} else {
currentChunk = chunk(memValues)