Fixups for single-op-per-fingerprint view rendering.

Change-Id: Ie496d4529b65a3819c6042f43d7cf99e0e1ac60b
This commit is contained in:
Julius Volz 2014-03-07 00:54:28 +01:00
parent 8b43497002
commit 5745ce0a60
2 changed files with 73 additions and 64 deletions

View file

@ -15,6 +15,7 @@ package metric
import ( import (
"time" "time"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
@ -93,6 +94,7 @@ func (l *valueAtTimeList) Get(fp *clientmodel.Fingerprint, time clientmodel.Time
} }
op.fp = *fp op.fp = *fp
op.current = time op.current = time
op.consumed = false
return op return op
} }

View file

@ -434,91 +434,98 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
memValues := t.memoryArena.CloneSamples(fp) memValues := t.memoryArena.CloneSamples(fp)
// Abort the view rendering if the caller (MakeView) has timed out. for !op.Consumed() {
if len(viewJob.abort) > 0 { // Abort the view rendering if the caller (MakeView) has timed out.
return if len(viewJob.abort) > 0 {
} return
}
// Load data value chunk(s) around the current time. // Load data value chunk(s) around the current time.
targetTime := op.CurrentTime() targetTime := op.CurrentTime()
currentChunk := chunk{} currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk. // If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
if iterator == nil { if iterator == nil {
// Get a single iterator that will be used for all data extraction // Get a single iterator that will be used for all data extraction
// below. // below.
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close() defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent { if diskPresent = iterator.SeekToLast(); diskPresent {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
if err := iterator.Key(sampleKeyDto); err != nil { if err := iterator.Key(sampleKeyDto); err != nil {
panic(err) panic(err)
} }
firstBlock.Load(sampleKeyDto) lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
firstBlock.Load(sampleKeyDto)
}
} }
} }
}
if diskPresent { if diskPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues, expired := t.loadChunkAroundTime( diskValues, expired := t.loadChunkAroundTime(
iterator, iterator,
fp, fp,
targetTime, targetTime,
firstBlock, firstBlock,
lastBlock, lastBlock,
) )
if expired { if expired {
diskPresent = false diskPresent = false
} }
diskTimer.Stop() diskTimer.Stop()
// If we aimed past the newest value on disk, // If we aimed past the newest value on disk,
// combine it with the next value from memory. // combine it with the next value from memory.
if len(diskValues) == 0 { if len(diskValues) == 0 {
currentChunk = chunk(memValues) currentChunk = chunk(memValues)
} else {
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else { } else {
currentChunk = chunk(diskValues) if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
} }
} else {
currentChunk = chunk(memValues)
} }
} else { } else {
currentChunk = chunk(memValues) currentChunk = chunk(memValues)
} }
} else {
currentChunk = chunk(memValues)
}
// There's no data at all for this fingerprint, so stop processing. // There's no data at all for this fingerprint, so stop processing.
if len(currentChunk) == 0 { if len(currentChunk) == 0 {
continue break
} }
currentChunk = currentChunk.TruncateBefore(targetTime) currentChunk = currentChunk.TruncateBefore(targetTime)
lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) { if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime targetTime = lastChunkTime
} }
// Extract all needed data from the current chunk and append the if op.CurrentTime().After(targetTime) {
// extracted samples to the materialized view. break
for !op.Consumed() && !op.CurrentTime().After(targetTime) { }
view.appendSamples(fp, op.ExtractSamples(Values(currentChunk)))
// Extract all needed data from the current chunk and append the
// extracted samples to the materialized view.
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
view.appendSamples(fp, op.ExtractSamples(Values(currentChunk)))
}
} }
giveBackOp(op)
} }
extractionTimer.Stop() extractionTimer.Stop()