mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Query timeout added.
This is related to #454. Queries now timeout after a duration set by the -query.timeout flag. The TotalEvalTimer is now started/stopped inside any of the ast.Eval* functions.
This commit is contained in:
parent
bd4d04f371
commit
fa1e90003b
|
@ -29,7 +29,16 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
|
var (
|
||||||
|
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
|
||||||
|
queryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
|
||||||
|
)
|
||||||
|
|
||||||
|
type queryTimeoutError struct {
|
||||||
|
timeoutAfter time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e queryTimeoutError) Error() string { return fmt.Sprintf("query timeout after %v", e.timeoutAfter) }
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Raw data value types.
|
// Raw data value types.
|
||||||
|
@ -391,16 +400,24 @@ func labelsToKey(labels clientmodel.Metric) uint64 {
|
||||||
|
|
||||||
// EvalVectorInstant evaluates a VectorNode with an instant query.
|
// EvalVectorInstant evaluates a VectorNode with an instant query.
|
||||||
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
|
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
|
||||||
|
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
||||||
|
defer totalEvalTimer.Stop()
|
||||||
|
|
||||||
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
return node.Eval(timestamp), nil
|
return node.Eval(timestamp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EvalVectorRange evaluates a VectorNode with a range query.
|
// EvalVectorRange evaluates a VectorNode with a range query.
|
||||||
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
|
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
|
||||||
|
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
||||||
|
defer totalEvalTimer.Stop()
|
||||||
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
|
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
|
||||||
// null in JSON.
|
// null in JSON.
|
||||||
matrix := Matrix{}
|
matrix := Matrix{}
|
||||||
|
@ -413,10 +430,13 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod
|
||||||
}
|
}
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
// TODO implement watchdog timer for long-running queries.
|
|
||||||
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
|
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
|
||||||
sampleStreams := map[uint64]*SampleStream{}
|
sampleStreams := map[uint64]*SampleStream{}
|
||||||
for t := start; !t.After(end); t = t.Add(interval) {
|
for t := start; !t.After(end); t = t.Add(interval) {
|
||||||
|
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
evalTimer.Stop()
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
vector := node.Eval(t)
|
vector := node.Eval(t)
|
||||||
for _, sample := range vector {
|
for _, sample := range vector {
|
||||||
samplePair := metric.SamplePair{
|
samplePair := metric.SamplePair{
|
||||||
|
|
|
@ -160,6 +160,9 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
|
||||||
|
|
||||||
// EvalToString evaluates the given node into a string of the given format.
|
// EvalToString evaluates the given node into a string of the given format.
|
||||||
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string {
|
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string {
|
||||||
|
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
||||||
|
defer totalEvalTimer.Stop()
|
||||||
|
|
||||||
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
|
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
|
||||||
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
||||||
prepareTimer.Stop()
|
prepareTimer.Stop()
|
||||||
|
@ -212,6 +215,9 @@ func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputForma
|
||||||
|
|
||||||
// EvalToVector evaluates the given node into a Vector. Matrices aren't supported.
|
// EvalToVector evaluates the given node into a Vector. Matrices aren't supported.
|
||||||
func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
|
func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
|
||||||
|
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
||||||
|
defer totalEvalTimer.Stop()
|
||||||
|
|
||||||
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
|
prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
|
||||||
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
|
||||||
prepareTimer.Stop()
|
prepareTimer.Stop()
|
||||||
|
|
|
@ -110,22 +110,35 @@ func (i *iteratorInitializer) Visit(node Node) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
|
func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
|
||||||
|
totalTimer := queryStats.GetTimer(stats.TotalEvalTime)
|
||||||
|
|
||||||
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
||||||
analyzer := NewQueryAnalyzer(storage)
|
analyzer := NewQueryAnalyzer(storage)
|
||||||
Walk(analyzer, node)
|
Walk(analyzer, node)
|
||||||
analyzeTimer.Stop()
|
analyzeTimer.Stop()
|
||||||
|
|
||||||
// TODO: Preloading should time out after a given duration.
|
|
||||||
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
|
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
|
||||||
p := storage.NewPreloader()
|
p := storage.NewPreloader()
|
||||||
for fp, rangeDuration := range analyzer.FullRanges {
|
for fp, rangeDuration := range analyzer.FullRanges {
|
||||||
|
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
preloadTimer.Stop()
|
||||||
|
p.Close()
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil {
|
if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil {
|
||||||
|
preloadTimer.Stop()
|
||||||
p.Close()
|
p.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for fp := range analyzer.IntervalRanges {
|
for fp := range analyzer.IntervalRanges {
|
||||||
|
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
preloadTimer.Stop()
|
||||||
|
p.Close()
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil {
|
if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil {
|
||||||
|
preloadTimer.Stop()
|
||||||
p.Close()
|
p.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -141,16 +154,23 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage loc
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
|
func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
|
||||||
|
totalTimer := queryStats.GetTimer(stats.TotalEvalTime)
|
||||||
|
|
||||||
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
||||||
analyzer := NewQueryAnalyzer(storage)
|
analyzer := NewQueryAnalyzer(storage)
|
||||||
Walk(analyzer, node)
|
Walk(analyzer, node)
|
||||||
analyzeTimer.Stop()
|
analyzeTimer.Stop()
|
||||||
|
|
||||||
// TODO: Preloading should time out after a given duration.
|
|
||||||
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
|
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
|
||||||
p := storage.NewPreloader()
|
p := storage.NewPreloader()
|
||||||
for fp, rangeDuration := range analyzer.FullRanges {
|
for fp, rangeDuration := range analyzer.FullRanges {
|
||||||
|
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
preloadTimer.Stop()
|
||||||
|
p.Close()
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil {
|
if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil {
|
||||||
|
preloadTimer.Stop()
|
||||||
p.Close()
|
p.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -169,7 +189,13 @@ func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.T
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
for fp := range analyzer.IntervalRanges {
|
for fp := range analyzer.IntervalRanges {
|
||||||
|
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
|
||||||
|
preloadTimer.Stop()
|
||||||
|
p.Close()
|
||||||
|
return nil, queryTimeoutError{et}
|
||||||
|
}
|
||||||
if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil {
|
if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil {
|
||||||
|
preloadTimer.Stop()
|
||||||
p.Close()
|
p.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,11 @@ func (t *Timer) Stop() {
|
||||||
t.duration += time.Since(t.start)
|
t.duration += time.Since(t.start)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ElapsedTime returns the time that passed since starting the timer.
|
||||||
|
func (t *Timer) ElapsedTime() time.Duration {
|
||||||
|
return time.Since(t.start)
|
||||||
|
}
|
||||||
|
|
||||||
// Return a string representation of the Timer.
|
// Return a string representation of the Timer.
|
||||||
func (t *Timer) String() string {
|
func (t *Timer) String() string {
|
||||||
return fmt.Sprintf("%s: %s", t.name, t.duration)
|
return fmt.Sprintf("%s: %s", t.name, t.duration)
|
||||||
|
|
|
@ -65,7 +65,6 @@ func (serv MetricsService) Query(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp := clientmodel.TimestampFromTime(serv.time.Now())
|
timestamp := clientmodel.TimestampFromTime(serv.time.Now())
|
||||||
|
|
||||||
queryStats := stats.NewTimerGroup()
|
queryStats := stats.NewTimerGroup()
|
||||||
result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats)
|
result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats)
|
||||||
glog.V(1).Infof("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats)
|
glog.V(1).Infof("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats)
|
||||||
|
@ -123,7 +122,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
queryStats := stats.NewTimerGroup()
|
queryStats := stats.NewTimerGroup()
|
||||||
|
|
||||||
evalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
|
||||||
matrix, err := ast.EvalVectorRange(
|
matrix, err := ast.EvalVectorRange(
|
||||||
exprNode.(ast.VectorNode),
|
exprNode.(ast.VectorNode),
|
||||||
clientmodel.TimestampFromUnixNano(end-duration),
|
clientmodel.TimestampFromUnixNano(end-duration),
|
||||||
|
@ -135,7 +133,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprint(w, ast.ErrorToJSON(err))
|
fmt.Fprint(w, ast.ErrorToJSON(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
evalTimer.Stop()
|
|
||||||
|
|
||||||
sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start()
|
sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start()
|
||||||
sort.Sort(matrix)
|
sort.Sort(matrix)
|
||||||
|
|
Loading…
Reference in a new issue