From 5745ce0a60c968fbb887581b04f3142dc9cc06e1 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 7 Mar 2014 00:54:28 +0100 Subject: [PATCH] Fixups for single-op-per-fingerprint view rendering. Change-Id: Ie496d4529b65a3819c6042f43d7cf99e0e1ac60b --- storage/metric/freelist.go | 2 + storage/metric/tiered.go | 135 +++++++++++++++++++------------------ 2 files changed, 73 insertions(+), 64 deletions(-) diff --git a/storage/metric/freelist.go b/storage/metric/freelist.go index b260dc0ed2..d542f73c8f 100644 --- a/storage/metric/freelist.go +++ b/storage/metric/freelist.go @@ -15,6 +15,7 @@ package metric import ( "time" + "github.com/prometheus/prometheus/utility" clientmodel "github.com/prometheus/client_golang/model" @@ -93,6 +94,7 @@ func (l *valueAtTimeList) Get(fp *clientmodel.Fingerprint, time clientmodel.Time } op.fp = *fp op.current = time + op.consumed = false return op } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 54b2f0c5d4..b69f9edf96 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -434,91 +434,98 @@ func (t *TieredStorage) renderView(viewJob viewJob) { memValues := t.memoryArena.CloneSamples(fp) - // Abort the view rendering if the caller (MakeView) has timed out. - if len(viewJob.abort) > 0 { - return - } + for !op.Consumed() { + // Abort the view rendering if the caller (MakeView) has timed out. + if len(viewJob.abort) > 0 { + return + } - // Load data value chunk(s) around the current time. - targetTime := op.CurrentTime() + // Load data value chunk(s) around the current time. + targetTime := op.CurrentTime() - currentChunk := chunk{} - // If we aimed before the oldest value in memory, load more data from disk. - if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { - if iterator == nil { - // Get a single iterator that will be used for all data extraction - // below. - iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) - defer iterator.Close() - if diskPresent = iterator.SeekToLast(); diskPresent { - if err := iterator.Key(sampleKeyDto); err != nil { - panic(err) - } - - lastBlock.Load(sampleKeyDto) - - if !iterator.SeekToFirst() { - diskPresent = false - } else { + currentChunk := chunk{} + // If we aimed before the oldest value in memory, load more data from disk. + if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { + if iterator == nil { + // Get a single iterator that will be used for all data extraction + // below. + iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) + defer iterator.Close() + if diskPresent = iterator.SeekToLast(); diskPresent { if err := iterator.Key(sampleKeyDto); err != nil { panic(err) } - firstBlock.Load(sampleKeyDto) + lastBlock.Load(sampleKeyDto) + + if !iterator.SeekToFirst() { + diskPresent = false + } else { + if err := iterator.Key(sampleKeyDto); err != nil { + panic(err) + } + + firstBlock.Load(sampleKeyDto) + } } } - } - if diskPresent { - diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() - diskValues, expired := t.loadChunkAroundTime( - iterator, - fp, - targetTime, - firstBlock, - lastBlock, - ) - if expired { - diskPresent = false - } - diskTimer.Stop() + if diskPresent { + diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() + diskValues, expired := t.loadChunkAroundTime( + iterator, + fp, + targetTime, + firstBlock, + lastBlock, + ) + if expired { + diskPresent = false + } + diskTimer.Stop() - // If we aimed past the newest value on disk, - // combine it with the next value from memory. - if len(diskValues) == 0 { - currentChunk = chunk(memValues) - } else { - if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { - latestDiskValue := diskValues[len(diskValues)-1:] - currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + // If we aimed past the newest value on disk, + // combine it with the next value from memory. + if len(diskValues) == 0 { + currentChunk = chunk(memValues) } else { - currentChunk = chunk(diskValues) + if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { + latestDiskValue := diskValues[len(diskValues)-1:] + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + } else { + currentChunk = chunk(diskValues) + } } + } else { + currentChunk = chunk(memValues) } } else { currentChunk = chunk(memValues) } - } else { - currentChunk = chunk(memValues) - } - // There's no data at all for this fingerprint, so stop processing. - if len(currentChunk) == 0 { - continue - } + // There's no data at all for this fingerprint, so stop processing. + if len(currentChunk) == 0 { + break + } - currentChunk = currentChunk.TruncateBefore(targetTime) + currentChunk = currentChunk.TruncateBefore(targetTime) - lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp - if lastChunkTime.After(targetTime) { - targetTime = lastChunkTime - } + lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp + if lastChunkTime.After(targetTime) { + targetTime = lastChunkTime + } - // Extract all needed data from the current chunk and append the - // extracted samples to the materialized view. - for !op.Consumed() && !op.CurrentTime().After(targetTime) { - view.appendSamples(fp, op.ExtractSamples(Values(currentChunk))) + if op.CurrentTime().After(targetTime) { + break + } + + // Extract all needed data from the current chunk and append the + // extracted samples to the materialized view. + for !op.Consumed() && !op.CurrentTime().After(targetTime) { + view.appendSamples(fp, op.ExtractSamples(Values(currentChunk))) + } } + giveBackOp(op) } extractionTimer.Stop()