Merge pull request #278 from prometheus/debug/query-timings

Add debug timers to instant and range queries.
This commit is contained in:
juliusv 2013-06-05 09:41:51 -07:00
commit 115437ad1f
12 changed files with 280 additions and 22 deletions

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"time" "time"
@ -90,7 +91,7 @@ type AlertingRule struct {
func (rule AlertingRule) Name() string { return rule.name } func (rule AlertingRule) Name() string { return rule.name }
func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage) return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
} }
func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {

View file

@ -17,6 +17,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"log" "log"
"math" "math"
@ -255,8 +256,8 @@ func labelsToKey(labels model.Metric) string {
return strings.Join(keyParts, ",") // TODO not safe when label value contains comma. return strings.Join(keyParts, ",") // TODO not safe when label value contains comma.
} }
func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.TieredStorage) (vector Vector, err error) { func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (vector Vector, err error) {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage) viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
if err != nil { if err != nil {
return return
} }
@ -264,16 +265,20 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.Tie
return return
} }
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (Matrix, error) { func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (Matrix, error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to // Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON. // null in JSON.
matrix := Matrix{} matrix := Matrix{}
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage) viewTimer := queryStats.GetTimer(stats.TotalViewBuildingTime).Start()
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage, queryStats)
viewTimer.Stop()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO implement watchdog timer for long-running queries. // TODO implement watchdog timer for long-running queries.
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
sampleSets := map[string]*model.SampleSet{} sampleSets := map[string]*model.SampleSet{}
for t := start; t.Before(end); t = t.Add(interval) { for t := start; t.Before(end); t = t.Add(interval) {
vector := node.Eval(t, viewAdapter) vector := node.Eval(t, viewAdapter)
@ -293,10 +298,13 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
} }
} }
} }
evalTimer.Stop()
appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start()
for _, sampleSet := range sampleSets { for _, sampleSet := range sampleSets {
matrix = append(matrix, *sampleSet) matrix = append(matrix, *sampleSet)
} }
appendTimer.Stop()
return matrix, nil return matrix, nil
} }

View file

@ -16,6 +16,7 @@ package ast
import ( import (
"flag" "flag"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"time" "time"
) )
@ -39,6 +40,8 @@ type viewAdapter struct {
// The materialized view which contains all timeseries data required for // The materialized view which contains all timeseries data required for
// executing a query. // executing a query.
view metric.View view metric.View
// The TimerGroup object in which to capture query timing statistics.
stats *stats.TimerGroup
} }
// interpolateSamples interpolates a value at a target time between two // interpolateSamples interpolates a value at a target time between two
@ -105,6 +108,7 @@ func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.T
} }
func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) { func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) {
timer := v.stats.GetTimer(stats.GetValueAtTimeTime).Start()
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp) sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp)
samplePair := v.chooseClosestSample(sampleCandidates, timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp)
@ -120,10 +124,12 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp
}) })
} }
} }
timer.Stop()
return samples, err return samples, err
} }
func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {
timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start()
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
samplePairs := v.view.GetBoundaryValues(fingerprint, *interval) samplePairs := v.view.GetBoundaryValues(fingerprint, *interval)
if len(samplePairs) == 0 { if len(samplePairs) == 0 {
@ -142,10 +148,12 @@ func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interva
} }
sampleSets = append(sampleSets, sampleSet) sampleSets = append(sampleSets, sampleSet)
} }
timer.Stop()
return sampleSets, nil return sampleSets, nil
} }
func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {
timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start()
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
samplePairs := v.view.GetRangeValues(fingerprint, *interval) samplePairs := v.view.GetRangeValues(fingerprint, *interval)
if len(samplePairs) == 0 { if len(samplePairs) == 0 {
@ -164,10 +172,11 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *
} }
sampleSets = append(sampleSets, sampleSet) sampleSets = append(sampleSets, sampleSet)
} }
timer.Stop()
return sampleSets, nil return sampleSets, nil
} }
func NewViewAdapter(view metric.View, storage *metric.TieredStorage) *viewAdapter { func NewViewAdapter(view metric.View, storage *metric.TieredStorage, queryStats *stats.TimerGroup) *viewAdapter {
stalenessPolicy := StalenessPolicy{ stalenessPolicy := StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
} }
@ -176,5 +185,6 @@ func NewViewAdapter(view metric.View, storage *metric.TieredStorage) *viewAdapte
stalenessPolicy: stalenessPolicy, stalenessPolicy: stalenessPolicy,
storage: storage, storage: storage,
view: view, view: view,
stats: queryStats,
} }
} }

View file

@ -17,6 +17,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"sort" "sort"
@ -153,15 +154,19 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
return string(dataJSON) return string(dataJSON)
} }
func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *metric.TieredStorage) string { func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *metric.TieredStorage, queryStats *stats.TimerGroup) string {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage) viewTimer := queryStats.GetTimer(stats.TotalViewBuildingTime).Start()
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
viewTimer.Stop()
if err != nil { if err != nil {
panic(err) panic(err)
} }
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
switch node.Type() { switch node.Type() {
case SCALAR: case SCALAR:
scalar := node.(ScalarNode).Eval(timestamp, viewAdapter) scalar := node.(ScalarNode).Eval(timestamp, viewAdapter)
evalTimer.Stop()
switch format { switch format {
case TEXT: case TEXT:
return fmt.Sprintf("scalar: %v", scalar) return fmt.Sprintf("scalar: %v", scalar)
@ -170,6 +175,7 @@ func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *
} }
case VECTOR: case VECTOR:
vector := node.(VectorNode).Eval(timestamp, viewAdapter) vector := node.(VectorNode).Eval(timestamp, viewAdapter)
evalTimer.Stop()
switch format { switch format {
case TEXT: case TEXT:
return vector.String() return vector.String()
@ -178,6 +184,7 @@ func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *
} }
case MATRIX: case MATRIX:
matrix := node.(MatrixNode).Eval(timestamp, viewAdapter) matrix := node.(MatrixNode).Eval(timestamp, viewAdapter)
evalTimer.Stop()
switch format { switch format {
case TEXT: case TEXT:
return matrix.String() return matrix.String()
@ -186,6 +193,7 @@ func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *
} }
case STRING: case STRING:
str := node.(StringNode).Eval(timestamp, viewAdapter) str := node.(StringNode).Eval(timestamp, viewAdapter)
evalTimer.Stop()
switch format { switch format {
case TEXT: case TEXT:
return str return str

View file

@ -15,6 +15,7 @@ package ast
import ( import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"log" "log"
"time" "time"
@ -95,9 +96,13 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
} }
} }
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (*viewAdapter, error) { func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage) analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node) analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder() viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges { for fingerprint, rangeDuration := range analyzer.FullRanges {
viewBuilder.GetMetricRange(fingerprint, timestamp.Add(-rangeDuration), timestamp) viewBuilder.GetMetricRange(fingerprint, timestamp.Add(-rangeDuration), timestamp)
@ -105,16 +110,24 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.
for fingerprint := range analyzer.IntervalRanges { for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtTime(fingerprint, timestamp) viewBuilder.GetMetricAtTime(fingerprint, timestamp)
} }
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) requestBuildTimer.Stop()
buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats)
buildTimer.Stop()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewViewAdapter(view, storage), nil return NewViewAdapter(view, storage, queryStats), nil
} }
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (*viewAdapter, error) { func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage) analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node) analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder() viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges { for fingerprint, rangeDuration := range analyzer.FullRanges {
// TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder. // TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder.
@ -125,9 +138,13 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
for fingerprint := range analyzer.IntervalRanges { for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval) viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval)
} }
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) requestBuildTimer.Stop()
buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats)
buildTimer.Stop()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewViewAdapter(view, storage), nil return NewViewAdapter(view, storage, queryStats), nil
} }

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"time" "time"
) )
@ -32,7 +33,7 @@ type RecordingRule struct {
func (rule RecordingRule) Name() string { return rule.name } func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage) return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
} }
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"path" "path"
@ -377,7 +378,7 @@ func TestExpressions(t *testing.T) {
t.Errorf("%d. Test should fail, but didn't", i) t.Errorf("%d. Test should fail, but didn't", i)
} }
failed := false failed := false
resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT, tieredStorage) resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT, tieredStorage, stats.NewTimerGroup())
resultLines := strings.Split(resultStr, "\n") resultLines := strings.Split(resultStr, "\n")
if len(exprTest.output) != len(resultLines) { if len(exprTest.output) != len(resultLines) {

81
stats/query_stats.go Normal file
View file

@ -0,0 +1,81 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
// QueryTiming identifies the code area or functionality in which time is spent
// during a query.
type QueryTiming int
// Query timings.
const (
TotalEvalTime QueryTiming = iota
ResultSortTime
JsonEncodeTime
TotalViewBuildingTime
ViewRequestBuildTime
InnerViewBuildingTime
InnerEvalTime
ResultAppendTime
QueryAnalysisTime
GetValueAtTimeTime
GetBoundaryValuesTime
GetRangeValuesTime
ViewQueueTime
ViewDiskPreparationTime
ViewScanJobsTime
ViewDataExtractionTime
ViewDiskExtractionTime
)
// Return a string represenation of a QueryTiming identifier.
func (s QueryTiming) String() string {
switch s {
case TotalEvalTime:
return "Total eval time"
case ResultSortTime:
return "Result sorting time"
case JsonEncodeTime:
return "JSON encoding time"
case TotalViewBuildingTime:
return "Total view building time"
case ViewRequestBuildTime:
return "View request building time"
case InnerViewBuildingTime:
return "Inner view building time"
case InnerEvalTime:
return "Inner eval time"
case ResultAppendTime:
return "Result append time"
case QueryAnalysisTime:
return "Query analysis time"
case GetValueAtTimeTime:
return "GetValueAtTime() time"
case GetBoundaryValuesTime:
return "GetBoundaryValues() time"
case GetRangeValuesTime:
return "GetRangeValues() time"
case ViewQueueTime:
return "View queue wait time"
case ViewScanJobsTime:
return "View building job scanning time"
case ViewDiskPreparationTime:
return "View building disk preparation time"
case ViewDataExtractionTime:
return "Total view data extraction time"
case ViewDiskExtractionTime:
return "View disk data extraction time"
default:
return "Unknown query timing"
}
}

100
stats/timer.go Normal file
View file

@ -0,0 +1,100 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stats
import (
"bytes"
"fmt"
"sort"
"time"
)
// A timer that can be started and stopped and accumulates the total time it
// was running (the time between Start() and Stop()).
type Timer struct {
name fmt.Stringer
created time.Time
start time.Time
duration time.Duration
}
// Start the timer.
func (t *Timer) Start() *Timer {
t.start = time.Now()
return t
}
// Stop the timer.
func (t *Timer) Stop() {
t.duration += time.Since(t.start)
}
// Return a string representation of the Timer.
func (t *Timer) String() string {
return fmt.Sprintf("%s: %s", t.name, t.duration)
}
// A TimerGroup represents a group of timers relevant to a single query.
type TimerGroup struct {
timers map[fmt.Stringer]*Timer
child *TimerGroup
}
// Construct a new TimerGroup.
func NewTimerGroup() *TimerGroup {
return &TimerGroup{timers: map[fmt.Stringer]*Timer{}}
}
// Get (and create, if necessary) the Timer for a given code section.
func (t *TimerGroup) GetTimer(name fmt.Stringer) *Timer {
if timer, exists := t.timers[name]; exists {
return timer
}
timer := &Timer{
name: name,
created: time.Now(),
}
t.timers[name] = timer
return timer
}
type Timers []*Timer
type byCreationTimeSorter struct{ Timers }
func (t Timers) Len() int {
return len(t)
}
func (t Timers) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
func (s byCreationTimeSorter) Less(i, j int) bool {
return s.Timers[i].created.Before(s.Timers[j].created)
}
// Return a string representation of a TimerGroup.
func (t *TimerGroup) String() string {
timers := byCreationTimeSorter{}
for _, timer := range t.timers {
timers.Timers = append(timers.Timers, timer)
}
sort.Sort(timers)
result := &bytes.Buffer{}
for _, timer := range timers.Timers {
fmt.Fprintf(result, "%s\n", timer)
}
return result.String()
}

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
) )
@ -77,6 +78,7 @@ type viewJob struct {
output chan View output chan View
abort chan bool abort chan bool
err chan error err chan error
stats *stats.TimerGroup
} }
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
@ -120,7 +122,7 @@ func (t *TieredStorage) Drain() {
} }
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout. // Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) { func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
if len(t.draining) > 0 { if len(t.draining) > 0 {
return nil, fmt.Errorf("Storage is in the process of draining.") return nil, fmt.Errorf("Storage is in the process of draining.")
} }
@ -133,11 +135,13 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
// has already exited and doesn't consume from the channel anymore. // has already exited and doesn't consume from the channel anymore.
abortChan := make(chan bool, 1) abortChan := make(chan bool, 1)
errChan := make(chan error) errChan := make(chan error)
queryStats.GetTimer(stats.ViewQueueTime).Start()
t.viewQueue <- viewJob{ t.viewQueue <- viewJob{
builder: builder, builder: builder,
output: result, output: result,
abort: abortChan, abort: abortChan,
err: errChan, err: errChan,
stats: queryStats,
} }
select { select {
@ -169,6 +173,7 @@ func (t *TieredStorage) Serve() {
case <-flushMemoryTicker.C: case <-flushMemoryTicker.C:
t.flushMemory(t.memoryTTL) t.flushMemory(t.memoryTTL)
case viewRequest := <-t.viewQueue: case viewRequest := <-t.viewQueue:
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
t.renderView(viewRequest) t.renderView(viewRequest)
case drainingDone := <-t.draining: case drainingDone := <-t.draining:
t.Flush() t.Flush()
@ -260,13 +265,16 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
}() }()
scanJobsTimer := viewJob.stats.GetTimer(stats.ViewScanJobsTime).Start()
scans := viewJob.builder.ScanJobs() scans := viewJob.builder.ScanJobs()
scanJobsTimer.Stop()
view := newView() view := newView()
var iterator leveldb.Iterator = nil var iterator leveldb.Iterator = nil
var diskFrontier *diskFrontier = nil var diskFrontier *diskFrontier = nil
var diskPresent = true var diskPresent = true
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans { for _, scanJob := range scans {
var seriesFrontier *seriesFrontier = nil var seriesFrontier *seriesFrontier = nil
var seriesPresent = true var seriesPresent = true
@ -286,6 +294,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk := chunk{} currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk. // If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent { if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
diskPrepareTimer := viewJob.stats.GetTimer(stats.ViewDiskPreparationTime).Start()
// Conditionalize disk access. // Conditionalize disk access.
if diskFrontier == nil && diskPresent { if diskFrontier == nil && diskPresent {
if iterator == nil { if iterator == nil {
@ -310,9 +319,12 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
panic(err) panic(err)
} }
} }
diskPrepareTimer.Stop()
if diskPresent && seriesPresent { if diskPresent && seriesPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
diskTimer.Stop()
// If we aimed past the newest value on disk, combine it with the next value from memory. // If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
@ -381,6 +393,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
sort.Sort(startsAtSort{standingOps}) sort.Sort(startsAtSort{standingOps})
} }
} }
extractionTimer.Stop()
viewJob.output <- view viewJob.output <- view
return return

View file

@ -15,6 +15,7 @@ package metric
import ( import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"sort" "sort"
"testing" "testing"
@ -363,7 +364,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through) requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through)
} }
v, err := tiered.MakeView(requestBuilder, time.Second*5) v, err := tiered.MakeView(requestBuilder, time.Second*5, stats.NewTimerGroup())
if err != nil { if err != nil {
t.Fatalf("%d. failed due to %s", i, err) t.Fatalf("%d. failed due to %s", i, err)

View file

@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"log" "log"
"net/http" "net/http"
"sort" "sort"
@ -33,7 +34,7 @@ func (serv MetricsService) setAccessControlHeaders(rb *gorest.ResponseBuilder) {
rb.AddHeader("Access-Control-Expose-Headers", "Date") rb.AddHeader("Access-Control-Expose-Headers", "Date")
} }
func (serv MetricsService) Query(expr string, formatJson string) (result string) { func (serv MetricsService) Query(expr string, formatJson string) string {
exprNode, err := rules.LoadExprFromString(expr) exprNode, err := rules.LoadExprFromString(expr)
if err != nil { if err != nil {
return ast.ErrorToJSON(err) return ast.ErrorToJSON(err)
@ -52,7 +53,10 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string)
rb.SetContentType(gorest.Text_Plain) rb.SetContentType(gorest.Text_Plain)
} }
return ast.EvalToString(exprNode, timestamp, format, serv.Storage) queryStats := stats.NewTimerGroup()
result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats)
log.Printf("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats)
return result
} }
func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string { func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string {
@ -82,18 +86,31 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st
// Align the start to step "tick" boundary. // Align the start to step "tick" boundary.
end -= end % step end -= end % step
queryStats := stats.NewTimerGroup()
evalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
matrix, err := ast.EvalVectorRange( matrix, err := ast.EvalVectorRange(
exprNode.(ast.VectorNode), exprNode.(ast.VectorNode),
time.Unix(end-duration, 0).UTC(), time.Unix(end-duration, 0).UTC(),
time.Unix(end, 0).UTC(), time.Unix(end, 0).UTC(),
time.Duration(step)*time.Second, time.Duration(step)*time.Second,
serv.Storage) serv.Storage,
queryStats)
if err != nil { if err != nil {
return ast.ErrorToJSON(err) return ast.ErrorToJSON(err)
} }
evalTimer.Stop()
sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start()
sort.Sort(matrix) sort.Sort(matrix)
return ast.TypedValueToJSON(matrix, "matrix") sortTimer.Stop()
jsonTimer := queryStats.GetTimer(stats.JsonEncodeTime).Start()
result := ast.TypedValueToJSON(matrix, "matrix")
jsonTimer.Stop()
log.Printf("Range query: %s\nQuery stats:\n%s\n", expr, queryStats)
return result
} }
func (serv MetricsService) Metrics() string { func (serv MetricsService) Metrics() string {