diff --git a/storage/metric/curator.go b/storage/metric/curator.go index e7bc51692..d9920cea8 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -81,7 +81,7 @@ type watermarkOperator struct { curationState raw.Persistence // diskFrontier models the available seekable ranges for the provided // sampleIterator. - diskFrontier diskFrontier + diskFrontier *diskFrontier // ignoreYoungerThan is passed into the curation remark for the given series. ignoreYoungerThan time.Duration // processor is responsible for executing a given stategy on the @@ -128,11 +128,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process iterator := samples.NewIterator(true) defer iterator.Close() - diskFrontier, err := newDiskFrontier(iterator) + diskFrontier, present, err := newDiskFrontier(iterator) if err != nil { return } - if diskFrontier == nil { + if !present { // No sample database exists; no work to do! return } @@ -153,7 +153,7 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process // begun for a given series. operator := watermarkOperator{ curationState: curationState, - diskFrontier: *diskFrontier, + diskFrontier: diskFrontier, processor: processor, ignoreYoungerThan: ignoreYoungerThan, sampleIterator: iterator, @@ -319,8 +319,8 @@ func (w watermarkFilter) curationConsistent(f *model.Fingerprint, watermark mode func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) { fingerprint := key.(*model.Fingerprint) - seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) - if err != nil || seriesFrontier == nil { + seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) + if err != nil || !present { // An anomaly with the series frontier is severe in the sense that some sort // of an illegal state condition exists in the storage layer, which would // probably signify an illegal disk frontier. diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 033f1c570..dbf084661 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -43,26 +43,21 @@ func (f diskFrontier) ContainsFingerprint(fingerprint *model.Fingerprint) bool { return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) } -func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { - +func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) { if !i.SeekToLast() || i.Key() == nil { - return + return nil, false, nil } - lastKey, err := extractSampleKey(i) if err != nil { - panic(fmt.Sprintln(err, i.Key(), i.Value())) + return nil, false, err } if !i.SeekToFirst() || i.Key() == nil { - return + return nil, false, nil } firstKey, err := extractSampleKey(i) - if i.Key() == nil { - return - } if err != nil { - panic(err) + return nil, false, err } d = &diskFrontier{} @@ -72,7 +67,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { d.lastFingerprint = lastKey.Fingerprint d.lastSupertime = lastKey.FirstTimestamp - return + return d, true, nil } // seriesFrontier represents the valid seek frontier for a given series. @@ -87,9 +82,8 @@ func (f seriesFrontier) String() string { } // newSeriesFrontier furnishes a populated diskFrontier for a given -// fingerprint. A nil diskFrontier will be returned if the series cannot -// be found in the store. -func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) { +// fingerprint. If the series is absent, present will be false. +func newSeriesFrontier(f *model.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) { lowerSeek := firstSupertime upperSeek := lastSupertime @@ -97,7 +91,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) // is outside of its seeking domain, there is no way that a seriesFrontier // could be materialized. Simply bail. if !d.ContainsFingerprint(f) { - return + return nil, false, nil } // If we are either the first or the last key in the database, we need to use @@ -119,7 +113,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) i.Seek(raw) if i.Key() == nil { - return + return nil, false, fmt.Errorf("illegal condition: empty key") } retrievedKey, err := extractSampleKey(i) @@ -146,7 +140,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) // If the previous key does not match, we know that the requested // fingerprint does not live in the database. if !retrievedFingerprint.Equal(f) { - return + return nil, false, nil } } @@ -170,7 +164,7 @@ func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) s.firstSupertime = retrievedKey.FirstTimestamp - return + return s, true, nil } // Contains indicates whether a given time value is within the recorded diff --git a/storage/metric/helpers_test.go b/storage/metric/helpers_test.go index 023e782b6..43405a55a 100644 --- a/storage/metric/helpers_test.go +++ b/storage/metric/helpers_test.go @@ -86,7 +86,7 @@ func (t testTieredStorageCloser) Close() { func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) { var directory test.TemporaryDirectory directory = test.NewTemporaryDirectory("test_tiered_storage", t) - storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path()) + storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 0, directory.Path()) if err != nil { if storage != nil { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 0af4538e3..21e295410 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -15,15 +15,17 @@ package metric import ( "fmt" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/coding/indexable" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage/raw/leveldb" "log" "sort" "sync" "time" + + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/raw/leveldb" ) type chunk model.Values @@ -58,8 +60,6 @@ type TieredStorage struct { appendToDiskQueue chan model.Samples - diskFrontier *diskFrontier - memoryArena *memorySeriesStorage memoryTTL time.Duration flushMemoryInterval time.Duration @@ -151,21 +151,6 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat } } -func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { - begin := time.Now() - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) - }() - - t.diskFrontier, err = newDiskFrontier(i) - if err != nil { - return - } - return -} - // Starts serving requests. func (t *TieredStorage) Serve() { flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) @@ -277,25 +262,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) { scans := viewJob.builder.ScanJobs() view := newView() - // Get a single iterator that will be used for all data extraction below. - iterator := t.DiskStorage.MetricSamples.NewIterator(true) - defer iterator.Close() - // Rebuilding of the frontier should happen on a conditional basis if a - // (fingerprint, timestamp) tuple is outside of the current frontier. - err = t.rebuildDiskFrontier(iterator) - if err != nil { - panic(err) - } + var iterator leveldb.Iterator = nil + var diskFrontier *diskFrontier = nil + var seriesPresent = true + var diskPresent = true for _, scanJob := range scans { var seriesFrontier *seriesFrontier = nil - if t.diskFrontier != nil { - seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) - if err != nil { - panic(err) - } - } standingOps := scanJob.operations memValues := t.memoryArena.CloneSamples(scanJob.fingerprint) @@ -311,15 +285,44 @@ func (t *TieredStorage) renderView(viewJob viewJob) { currentChunk := chunk{} // If we aimed before the oldest value in memory, load more data from disk. - if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { - diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) + if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent { + // Conditionalize disk access. + if diskFrontier == nil && diskPresent { + if iterator == nil { + // Get a single iterator that will be used for all data extraction + // below. + iterator = t.DiskStorage.MetricSamples.NewIterator(true) + defer iterator.Close() + } - // If we aimed past the newest value on disk, combine it with the next value from memory. - if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { - latestDiskValue := diskValues[len(diskValues)-1:] - currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + diskFrontier, diskPresent, err = newDiskFrontier(iterator) + if err != nil { + panic(err) + } + if !diskPresent { + seriesPresent = false + } + } + + if seriesFrontier == nil && diskPresent { + seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator) + if err != nil { + panic(err) + } + } + + if diskPresent && seriesPresent { + diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) + + // If we aimed past the newest value on disk, combine it with the next value from memory. + if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { + latestDiskValue := diskValues[len(diskValues)-1:] + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + } else { + currentChunk = chunk(diskValues) + } } else { - currentChunk = chunk(diskValues) + currentChunk = chunk(memValues) } } else { currentChunk = chunk(memValues)