From 2b42fd00687fd0a58fea748de5cd2219ae4eeb86 Mon Sep 17 00:00:00 2001
From: "Matt T. Proud" <matt.proud@gmail.com>
Date: Thu, 22 Aug 2013 17:40:23 +0200
Subject: [PATCH] Snapshot of no more frontier.

Change-Id: Icd52da3f52bfe4529829ea70b4865ed7c9f6c446
---
 Makefile                    |   2 +-
 storage/metric/curator.go   |  89 +++++++++------
 storage/metric/frontier.go  | 196 ---------------------------------
 storage/metric/processor.go |   6 +-
 storage/metric/samplekey.go |  46 +++++---
 storage/metric/tiered.go    | 214 +++++++++++++++++-------------------
 6 files changed, 195 insertions(+), 358 deletions(-)
 delete mode 100644 storage/metric/frontier.go

diff --git a/Makefile b/Makefile
index 21777855dc..0355958c20 100644
--- a/Makefile
+++ b/Makefile
@@ -89,7 +89,7 @@ $(FULL_GOPATH):
 	-[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; }
 	[ -d "$(FULL_GOPATH)" ]
 
-test: build
+test: config dependencies model preparation tools web
 	$(GO) test $(GO_TEST_FLAGS) ./...
 
 tools: dependencies preparation
diff --git a/storage/metric/curator.go b/storage/metric/curator.go
index 513c64e360..3fdf2a5a3c 100644
--- a/storage/metric/curator.go
+++ b/storage/metric/curator.go
@@ -20,6 +20,7 @@ import (
 	"time"
 
 	"code.google.com/p/goprotobuf/proto"
+	"github.com/golang/glog"
 
 	clientmodel "github.com/prometheus/client_golang/model"
 
@@ -70,9 +71,6 @@ type Curator struct {
 type watermarkScanner struct {
 	// curationState is the data store for curation remarks.
 	curationState CurationRemarker
-	// diskFrontier models the available seekable ranges for the provided
-	// sampleIterator.
-	diskFrontier *diskFrontier
 	// ignoreYoungerThan is passed into the curation remark for the given series.
 	ignoreYoungerThan time.Duration
 	// processor is responsible for executing a given stategy on the
@@ -89,6 +87,8 @@ type watermarkScanner struct {
 	stop chan bool
 	// status is the outbound channel for notifying the status page of its state.
 	status CurationStateUpdater
+
+	firstBlock, lastBlock *SampleKey
 }
 
 // run facilitates the curation lifecycle.
@@ -119,14 +119,19 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
 	iterator := samples.NewIterator(true)
 	defer iterator.Close()
 
-	diskFrontier, present, err := newDiskFrontier(iterator)
-	if err != nil {
+	if !iterator.SeekToLast() {
+		glog.Info("Empty database; skipping curation.")
+
 		return
 	}
-	if !present {
-		// No sample database exists; no work to do!
+	lastBlock, _ := extractSampleKey(iterator)
+
+	if !iterator.SeekToFirst() {
+		glog.Info("Empty database; skipping curation.")
+
 		return
 	}
+	firstBlock, _ := extractSampleKey(iterator)
 
 	scanner := &watermarkScanner{
 		curationState:     curationState,
@@ -136,9 +141,11 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
 		stop:              c.Stop,
 		stopAt:            instant.Add(-1 * ignoreYoungerThan),
 
-		diskFrontier:   diskFrontier,
 		sampleIterator: iterator,
 		samples:        samples,
+
+		firstBlock: firstBlock,
+		lastBlock:  lastBlock,
 	}
 
 	// Right now, the ability to stop a curation is limited to the beginning of
@@ -271,12 +278,11 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
 func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
 	fingerprint := key.(*clientmodel.Fingerprint)
 
-	seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
-	if err != nil || !present {
-		// An anomaly with the series frontier is severe in the sense that some sort
-		// of an illegal state condition exists in the storage layer, which would
-		// probably signify an illegal disk frontier.
-		return &storage.OperatorError{error: err, Continuable: false}
+	if fingerprint.Less(w.firstBlock.Fingerprint) {
+		return nil
+	}
+	if w.lastBlock.Fingerprint.Less(fingerprint) {
+		return nil
 	}
 
 	curationState, present, err := w.curationState.Get(&curationKey{
@@ -285,7 +291,6 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
 		ProcessorMessageTypeName: w.processor.Name(),
 		IgnoreYoungerThan:        w.ignoreYoungerThan,
 	})
-
 	if err != nil {
 		// An anomaly with the curation remark is likely not fatal in the sense that
 		// there was a decoding error with the entity and shouldn't be cause to stop
@@ -293,23 +298,21 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
 		// work forward.  With an idempotent processor, this is safe.
 		return &storage.OperatorError{error: err, Continuable: true}
 	}
-	var firstSeek time.Time
-	switch {
-	case !present, seriesFrontier.After(curationState):
-		firstSeek = seriesFrontier.firstSupertime
-	case !seriesFrontier.InSafeSeekRange(curationState):
-		firstSeek = seriesFrontier.lastSupertime
-	default:
-		firstSeek = curationState
+
+	keySet := &SampleKey{
+		Fingerprint: fingerprint,
 	}
 
-	startKey := &SampleKey{
-		Fingerprint:    fingerprint,
-		FirstTimestamp: firstSeek,
+	if !present && fingerprint.Equal(w.firstBlock.Fingerprint) {
+		// If the fingerprint is the same, then we simply need to use the earliest
+		// block found in the database.
+		*keySet = *w.firstBlock
+	} else if present {
+		keySet.FirstTimestamp = curationState
 	}
+
 	dto := new(dto.SampleKey)
-
-	startKey.Dump(dto)
+	keySet.Dump(dto)
 	prospectiveKey := coding.NewPBEncoder(dto).MustEncode()
 	if !w.sampleIterator.Seek(prospectiveKey) {
 		// LevelDB is picky about the seek ranges.  If an iterator was invalidated,
@@ -317,25 +320,41 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
 		return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
 	}
 
-	newestAllowedSample := w.stopAt
-	if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
-		newestAllowedSample = seriesFrontier.lastSupertime
+	for {
+		sampleKey, err := extractSampleKey(w.sampleIterator)
+		if err != nil {
+			return
+		}
+		if !sampleKey.Fingerprint.Equal(fingerprint) {
+			return
+		}
+
+		if !present {
+			break
+		}
+
+		if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) {
+			break
+		}
+
+		if !w.sampleIterator.Next() {
+			return
+		}
 	}
 
-	lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
+	lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
 	if err != nil {
 		// We can't divine the severity of a processor error without refactoring the
 		// interface.
 		return &storage.OperatorError{error: err, Continuable: false}
 	}
 
-	err = w.curationState.Update(&curationKey{
+	if err = w.curationState.Update(&curationKey{
 		Fingerprint:              fingerprint,
 		ProcessorMessageRaw:      w.processor.Signature(),
 		ProcessorMessageTypeName: w.processor.Name(),
 		IgnoreYoungerThan:        w.ignoreYoungerThan,
-	}, lastTime)
-	if err != nil {
+	}, lastTime); err != nil {
 		// Under the assumption that the processors are idempotent, they can be
 		// re-run; thusly, the commitment of the curation remark is no cause
 		// to cease further progress.
diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go
deleted file mode 100644
index b61520ab24..0000000000
--- a/storage/metric/frontier.go
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright 2013 Prometheus Team
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package metric
-
-import (
-	"fmt"
-	"time"
-
-	clientmodel "github.com/prometheus/client_golang/model"
-
-	"github.com/prometheus/prometheus/coding"
-	"github.com/prometheus/prometheus/coding/indexable"
-	"github.com/prometheus/prometheus/storage/raw/leveldb"
-
-	dto "github.com/prometheus/prometheus/model/generated"
-)
-
-// diskFrontier describes an on-disk store of series to provide a
-// representation of the known keyspace and time series values available.
-//
-// This is used to reduce the burden associated with LevelDB iterator
-// management.
-type diskFrontier struct {
-	firstFingerprint *clientmodel.Fingerprint
-	firstSupertime   time.Time
-	lastFingerprint  *clientmodel.Fingerprint
-	lastSupertime    time.Time
-}
-
-func (f diskFrontier) String() string {
-	return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint, f.firstSupertime, f.lastFingerprint, f.lastSupertime)
-}
-
-func (f *diskFrontier) ContainsFingerprint(fingerprint *clientmodel.Fingerprint) bool {
-	return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
-}
-
-func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) {
-	if !i.SeekToLast() || i.Key() == nil {
-		return nil, false, nil
-	}
-	lastKey, err := extractSampleKey(i)
-	if err != nil {
-		return nil, false, err
-	}
-
-	if !i.SeekToFirst() || i.Key() == nil {
-		return nil, false, nil
-	}
-	firstKey, err := extractSampleKey(i)
-	if err != nil {
-		return nil, false, err
-	}
-
-	return &diskFrontier{
-		firstFingerprint: firstKey.Fingerprint,
-		firstSupertime:   firstKey.FirstTimestamp,
-		lastFingerprint:  lastKey.Fingerprint,
-		lastSupertime:    lastKey.FirstTimestamp,
-	}, true, nil
-}
-
-// seriesFrontier represents the valid seek frontier for a given series.
-type seriesFrontier struct {
-	firstSupertime time.Time
-	lastSupertime  time.Time
-	lastTime       time.Time
-}
-
-func (f *seriesFrontier) String() string {
-	return fmt.Sprintf("seriesFrontier from %s to %s at %s", f.firstSupertime, f.lastSupertime, f.lastTime)
-}
-
-// newSeriesFrontier furnishes a populated diskFrontier for a given
-// fingerprint.  If the series is absent, present will be false.
-func newSeriesFrontier(f *clientmodel.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) {
-	lowerSeek := firstSupertime
-	upperSeek := lastSupertime
-
-	// If the diskFrontier for this iterator says that the candidate fingerprint
-	// is outside of its seeking domain, there is no way that a seriesFrontier
-	// could be materialized.  Simply bail.
-	if !d.ContainsFingerprint(f) {
-		return nil, false, nil
-	}
-
-	// If we are either the first or the last key in the database, we need to use
-	// pessimistic boundary frontiers.
-	if f.Equal(d.firstFingerprint) {
-		lowerSeek = indexable.EncodeTime(d.firstSupertime)
-	}
-	if f.Equal(d.lastFingerprint) {
-		upperSeek = indexable.EncodeTime(d.lastSupertime)
-	}
-
-	// TODO: Convert this to SampleKey.ToPartialDTO.
-	fp := new(dto.Fingerprint)
-	dumpFingerprint(fp, f)
-	key := &dto.SampleKey{
-		Fingerprint: fp,
-		Timestamp:   upperSeek,
-	}
-
-	raw := coding.NewPBEncoder(key).MustEncode()
-	i.Seek(raw)
-
-	if i.Key() == nil {
-		return nil, false, fmt.Errorf("illegal condition: empty key")
-	}
-
-	retrievedKey, err := extractSampleKey(i)
-	if err != nil {
-		panic(err)
-	}
-
-	retrievedFingerprint := retrievedKey.Fingerprint
-
-	// The returned fingerprint may not match if the original seek key lives
-	// outside of a metric's frontier.  This is probable, for we are seeking to
-	// to the maximum allowed time, which could advance us to the next
-	// fingerprint.
-	//
-	//
-	if !retrievedFingerprint.Equal(f) {
-		i.Previous()
-
-		retrievedKey, err = extractSampleKey(i)
-		if err != nil {
-			panic(err)
-		}
-		retrievedFingerprint := retrievedKey.Fingerprint
-		// If the previous key does not match, we know that the requested
-		// fingerprint does not live in the database.
-		if !retrievedFingerprint.Equal(f) {
-			return nil, false, nil
-		}
-	}
-
-	s = &seriesFrontier{
-		lastSupertime: retrievedKey.FirstTimestamp,
-		lastTime:      retrievedKey.LastTimestamp,
-	}
-
-	key.Timestamp = lowerSeek
-
-	raw = coding.NewPBEncoder(key).MustEncode()
-
-	i.Seek(raw)
-
-	retrievedKey, err = extractSampleKey(i)
-	if err != nil {
-		panic(err)
-	}
-
-	retrievedFingerprint = retrievedKey.Fingerprint
-
-	s.firstSupertime = retrievedKey.FirstTimestamp
-
-	return s, true, nil
-}
-
-// Contains indicates whether a given time value is within the recorded
-// interval.
-func (s *seriesFrontier) Contains(t time.Time) bool {
-	return !(t.Before(s.firstSupertime) || t.After(s.lastTime))
-}
-
-// InSafeSeekRange indicates whether the time is within the recorded time range
-// and is safely seekable such that a seek does not result in an iterator point
-// after the last value of the series or outside of the entire store.
-func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
-	if !s.Contains(t) {
-		return
-	}
-
-	if s.lastSupertime.Before(t) {
-		return
-	}
-
-	return true
-}
-
-func (s *seriesFrontier) After(t time.Time) bool {
-	return s.firstSupertime.After(t)
-}
diff --git a/storage/metric/processor.go b/storage/metric/processor.go
index 001086d9f4..8310325550 100644
--- a/storage/metric/processor.go
+++ b/storage/metric/processor.go
@@ -107,12 +107,13 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
 	if err != nil {
 		return
 	}
+
 	unactedSamples, err = extractSampleValues(sampleIterator)
 	if err != nil {
 		return
 	}
 
-	for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
+	for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
 		switch {
 		// Furnish a new pending batch operation if none is available.
 		case pendingBatch == nil:
@@ -289,6 +290,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
 	if err != nil {
 		return
 	}
+
 	sampleValues, err := extractSampleValues(sampleIterator)
 	if err != nil {
 		return
@@ -296,7 +298,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
 
 	pendingMutations := 0
 
-	for lastCurated.Before(stopAt) {
+	for lastCurated.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
 		switch {
 		// Furnish a new pending batch operation if none is available.
 		case pendingBatch == nil:
diff --git a/storage/metric/samplekey.go b/storage/metric/samplekey.go
index afcfad4932..e76133fba7 100644
--- a/storage/metric/samplekey.go
+++ b/storage/metric/samplekey.go
@@ -35,6 +35,24 @@ type SampleKey struct {
 	SampleCount    uint32
 }
 
+func (s *SampleKey) Equal(o *SampleKey) bool {
+	if s == o {
+		return true
+	}
+
+	if !s.Fingerprint.Equal(o.Fingerprint) {
+		return false
+	}
+	if !s.FirstTimestamp.Equal(o.FirstTimestamp) {
+		return false
+	}
+	if !s.LastTimestamp.Equal(o.LastTimestamp) {
+		return false
+	}
+
+	return s.SampleCount == o.SampleCount
+}
+
 // MayContain indicates whether the given SampleKey could potentially contain a
 // value at the provided time.  Even if true is emitted, that does not mean a
 // satisfactory value, in fact, exists.
@@ -49,6 +67,21 @@ func (s *SampleKey) MayContain(t time.Time) bool {
 	}
 }
 
+func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t time.Time) bool {
+	if s.Fingerprint.Less(fp) {
+		return true
+	}
+	if !s.Fingerprint.Equal(fp) {
+		return false
+	}
+
+	if s.FirstTimestamp.Before(t) {
+		return true
+	}
+
+	return s.LastTimestamp.Before(t)
+}
+
 // ToDTO converts this SampleKey into a DTO for use in serialization purposes.
 func (s *SampleKey) Dump(d *dto.SampleKey) {
 	d.Reset()
@@ -61,19 +94,6 @@ func (s *SampleKey) Dump(d *dto.SampleKey) {
 	d.SampleCount = proto.Uint32(s.SampleCount)
 }
 
-// ToPartialDTO converts this SampleKey into a DTO that is only suitable for
-// database exploration purposes for a given (Fingerprint, First Sample Time)
-// tuple.
-func (s *SampleKey) FOOdumpPartial(d *dto.SampleKey) {
-	d.Reset()
-
-	f := &dto.Fingerprint{}
-	dumpFingerprint(f, s.Fingerprint)
-
-	d.Fingerprint = f
-	d.Timestamp = indexable.EncodeTime(s.FirstTimestamp)
-}
-
 func (s *SampleKey) String() string {
 	return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
 }
diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go
index 73f67d86e3..23de17022e 100644
--- a/storage/metric/tiered.go
+++ b/storage/metric/tiered.go
@@ -24,7 +24,6 @@ import (
 	clientmodel "github.com/prometheus/client_golang/model"
 
 	"github.com/prometheus/prometheus/coding"
-	"github.com/prometheus/prometheus/coding/indexable"
 	"github.com/prometheus/prometheus/stats"
 	"github.com/prometheus/prometheus/storage/raw/leveldb"
 	"github.com/prometheus/prometheus/utility"
@@ -90,7 +89,6 @@ type TieredStorage struct {
 	state tieredStorageState
 
 	memorySemaphore chan bool
-	diskSemaphore   chan bool
 
 	wmCache *watermarkCache
 
@@ -107,7 +105,6 @@ type viewJob struct {
 }
 
 const (
-	tieredDiskSemaphores   = 1
 	tieredMemorySemaphores = 5
 )
 
@@ -136,15 +133,11 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
 		memoryTTL:           memoryTTL,
 		viewQueue:           make(chan viewJob, viewQueueDepth),
 
-		diskSemaphore:   make(chan bool, tieredDiskSemaphores),
 		memorySemaphore: make(chan bool, tieredMemorySemaphores),
 
 		wmCache: wmCache,
 	}
 
-	for i := 0; i < tieredDiskSemaphores; i++ {
-		s.diskSemaphore <- true
-	}
 	for i := 0; i < tieredMemorySemaphores; i++ {
 		s.memorySemaphore <- true
 	}
@@ -370,9 +363,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
 	scanJobsTimer.Stop()
 	view := newView()
 
-	var iterator leveldb.Iterator = nil
-	var diskFrontier *diskFrontier = nil
-	var diskPresent = true
+	var iterator leveldb.Iterator
+	diskPresent := true
+	var firstBlock, lastBlock *SampleKey
 
 	extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
 	for _, scanJob := range scans {
@@ -385,9 +378,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
 			continue
 		}
 
-		var seriesFrontier *seriesFrontier = nil
-		var seriesPresent = true
-
 		standingOps := scanJob.operations
 		memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
 
@@ -402,50 +392,40 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
 
 			currentChunk := chunk{}
 			// If we aimed before the oldest value in memory, load more data from disk.
-			if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
-				diskPrepareTimer := viewJob.stats.GetTimer(stats.ViewDiskPreparationTime).Start()
-				// Conditionalize disk access.
-				if diskFrontier == nil && diskPresent {
-					if iterator == nil {
-						<-t.diskSemaphore
-						defer func() {
-							t.diskSemaphore <- true
-						}()
-
-						// Get a single iterator that will be used for all data extraction
-						// below.
-						iterator = t.DiskStorage.MetricSamples.NewIterator(true)
-						defer iterator.Close()
-					}
-
-					diskFrontier, diskPresent, err = newDiskFrontier(iterator)
-					if err != nil {
-						panic(err)
-					}
-					if !diskPresent {
-						seriesPresent = false
+			if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
+				if iterator == nil {
+					// Get a single iterator that will be used for all data extraction
+					// below.
+					iterator = t.DiskStorage.MetricSamples.NewIterator(true)
+					defer iterator.Close()
+					if diskPresent = iterator.SeekToLast(); diskPresent {
+						lastBlock, _ = extractSampleKey(iterator)
+						if !iterator.SeekToFirst() {
+							diskPresent = false
+						} else {
+							firstBlock, _ = extractSampleKey(iterator)
+						}
 					}
 				}
 
-				if seriesFrontier == nil && diskPresent {
-					seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator)
-					if err != nil {
-						panic(err)
-					}
-				}
-				diskPrepareTimer.Stop()
-
-				if diskPresent && seriesPresent {
+				if diskPresent {
 					diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
-					diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
+					diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock)
+					if expired {
+						diskPresent = false
+					}
 					diskTimer.Stop()
 
 					// If we aimed past the newest value on disk, combine it with the next value from memory.
-					if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
-						latestDiskValue := diskValues[len(diskValues)-1:]
-						currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
+					if len(diskValues) == 0 {
+						currentChunk = chunk(memValues)
 					} else {
-						currentChunk = chunk(diskValues)
+						if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
+							latestDiskValue := diskValues[len(diskValues)-1:]
+							currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
+						} else {
+							currentChunk = chunk(diskValues)
+						}
 					}
 				} else {
 					currentChunk = chunk(memValues)
@@ -513,81 +493,93 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
 	return
 }
 
-func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint *clientmodel.Fingerprint, ts time.Time) (chunk Values) {
-
-	fd := &dto.Fingerprint{}
-	dumpFingerprint(fd, fingerprint)
-	targetKey := &dto.SampleKey{
-		Fingerprint: fd,
+func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts time.Time, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
+	if fingerprint.Less(firstBlock.Fingerprint) {
+		return nil, false
 	}
+	if lastBlock.Fingerprint.Less(fingerprint) {
+		return nil, true
+	}
+
+	seekingKey := &SampleKey{
+		Fingerprint: fingerprint,
+	}
+
+	if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
+		seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
+	} else if fingerprint.Equal(lastBlock.Fingerprint) && ts.After(lastBlock.FirstTimestamp) {
+		seekingKey.FirstTimestamp = lastBlock.FirstTimestamp
+	} else {
+		seekingKey.FirstTimestamp = ts
+	}
+
+	fd := new(dto.SampleKey)
+	seekingKey.Dump(fd)
+
+	// Try seeking to target key.
+	rawKey := coding.NewPBEncoder(fd).MustEncode()
+	if !iterator.Seek(rawKey) {
+		return chunk, true
+	}
+
 	var foundKey *SampleKey
 	var foundValues Values
 
-	// Limit the target key to be within the series' keyspace.
-	if ts.After(frontier.lastSupertime) {
-		targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime)
-	} else {
-		targetKey.Timestamp = indexable.EncodeTime(ts)
-	}
+	foundKey, _ = extractSampleKey(iterator)
 
-	// Try seeking to target key.
-	rawKey := coding.NewPBEncoder(targetKey).MustEncode()
-	iterator.Seek(rawKey)
+	if foundKey.Fingerprint.Equal(fingerprint) {
+		// Figure out if we need to rewind by one block.
+		// Imagine the following supertime blocks with time ranges:
+		//
+		// Block 1: ft 1000 - lt 1009 <data>
+		// Block 1: ft 1010 - lt 1019 <data>
+		//
+		// If we are aiming to find time 1005, we would first seek to the block with
+		// supertime 1010, then need to rewind by one block by virtue of LevelDB
+		// iterator seek behavior.
+		//
+		// Only do the rewind if there is another chunk before this one.
+		if !foundKey.MayContain(ts) {
+			postValues, _ := extractSampleValues(iterator)
+			if !foundKey.Equal(firstBlock) {
+				if !iterator.Previous() {
+					panic("This should never return false.")
+				}
 
-	foundKey, err := extractSampleKey(iterator)
-	if err != nil {
-		panic(err)
-	}
+				foundKey, _ = extractSampleKey(iterator)
 
-	// Figure out if we need to rewind by one block.
-	// Imagine the following supertime blocks with time ranges:
-	//
-	// Block 1: ft 1000 - lt 1009 <data>
-	// Block 1: ft 1010 - lt 1019 <data>
-	//
-	// If we are aiming to find time 1005, we would first seek to the block with
-	// supertime 1010, then need to rewind by one block by virtue of LevelDB
-	// iterator seek behavior.
-	//
-	// Only do the rewind if there is another chunk before this one.
-	rewound := false
-	firstTime := foundKey.FirstTimestamp
-	if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
-		iterator.Previous()
-		rewound = true
-	}
+				if !foundKey.Fingerprint.Equal(fingerprint) {
+					return postValues, false
+				}
 
-	foundValues, err = extractSampleValues(iterator)
-	if err != nil {
-		return
-	}
-
-	// If we rewound, but the target time is still past the current block, return
-	// the last value of the current (rewound) block and the entire next block.
-	if rewound {
-		foundKey, err = extractSampleKey(iterator)
-		if err != nil {
-			return
-		}
-		currentChunkLastTime := foundKey.LastTimestamp
-
-		if ts.After(currentChunkLastTime) {
-			sampleCount := len(foundValues)
-			chunk = append(chunk, foundValues[sampleCount-1])
-			// We know there's a next block since we have rewound from it.
-			iterator.Next()
-
-			foundValues, err = extractSampleValues(iterator)
-			if err != nil {
-				return
+				foundValues, _ = extractSampleValues(iterator)
+				foundValues = append(foundValues, postValues...)
+				return foundValues, false
 			}
 		}
+
+		foundValues, _ = extractSampleValues(iterator)
+		return foundValues, false
 	}
 
-	// Now append all the samples of the currently seeked block to the output.
-	chunk = append(chunk, foundValues...)
+	if fingerprint.Less(foundKey.Fingerprint) {
+		if !foundKey.Equal(firstBlock) {
+			if !iterator.Previous() {
+				panic("This should never return false.")
+			}
 
-	return
+			foundKey, _ = extractSampleKey(iterator)
+
+			if !foundKey.Fingerprint.Equal(fingerprint) {
+				return nil, false
+			}
+
+			foundValues, _ = extractSampleValues(iterator)
+			return foundValues, false
+		}
+	}
+
+	panic("illegal state: violated sort invariant")
 }
 
 // Get all label values that are associated with the provided label name.