mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
Add initial staleness handing to promql.
For instant vectors, if "stale" is the newest sample ignore the timeseries. For range vectors, filter out "stale" samples. Make it possible to inject "stale" samples in promql tests.
This commit is contained in:
parent
5060a0fc51
commit
80b40e6d91
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
@ -756,6 +757,9 @@ func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if math.Float64bits(v) == value.StaleNaN {
|
||||||
|
continue
|
||||||
|
}
|
||||||
vec = append(vec, Sample{
|
vec = append(vec, Sample{
|
||||||
Metric: node.series[i].Labels(),
|
Metric: node.series[i].Labels(),
|
||||||
Point: Point{V: v, T: t},
|
Point: Point{V: v, T: t},
|
||||||
|
@ -826,6 +830,9 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
||||||
buf := it.Buffer()
|
buf := it.Buffer()
|
||||||
for buf.Next() {
|
for buf.Next() {
|
||||||
t, v := buf.At()
|
t, v := buf.At()
|
||||||
|
if math.Float64bits(v) == value.StaleNaN {
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Values in the buffer are guaranteed to be smaller than maxt.
|
// Values in the buffer are guaranteed to be smaller than maxt.
|
||||||
if t >= mint {
|
if t >= mint {
|
||||||
allPoints = append(allPoints, Point{T: t, V: v})
|
allPoints = append(allPoints, Point{T: t, V: v})
|
||||||
|
@ -833,7 +840,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
||||||
}
|
}
|
||||||
// The seeked sample might also be in the range.
|
// The seeked sample might also be in the range.
|
||||||
t, v = it.Values()
|
t, v = it.Values()
|
||||||
if t == maxt {
|
if t == maxt && math.Float64bits(v) != value.StaleNaN {
|
||||||
allPoints = append(allPoints, Point{T: t, V: v})
|
allPoints = append(allPoints, Point{T: t, V: v})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ package promql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -24,6 +25,7 @@ import (
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/util/strutil"
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
)
|
)
|
||||||
|
@ -201,17 +203,25 @@ func (p *parser) parseSeriesDesc() (m labels.Labels, vals []sequenceValue, err e
|
||||||
sign = -1
|
sign = -1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
k := sign * p.number(p.expect(itemNumber, ctx).val)
|
var k float64
|
||||||
|
if t := p.peek().typ; t == itemNumber {
|
||||||
|
k = sign * p.number(p.expect(itemNumber, ctx).val)
|
||||||
|
} else if t == itemIdentifier && p.peek().val == "stale" {
|
||||||
|
p.next()
|
||||||
|
k = math.Float64frombits(value.StaleNaN)
|
||||||
|
} else {
|
||||||
|
p.errorf("expected number or 'stale' in %s but got %s", ctx, t.desc())
|
||||||
|
}
|
||||||
vals = append(vals, sequenceValue{
|
vals = append(vals, sequenceValue{
|
||||||
value: k,
|
value: k,
|
||||||
})
|
})
|
||||||
|
|
||||||
// If there are no offset repetitions specified, proceed with the next value.
|
// If there are no offset repetitions specified, proceed with the next value.
|
||||||
if t := p.peek().typ; t == itemNumber || t == itemBlank {
|
if t := p.peek(); t.typ == itemNumber || t.typ == itemBlank || t.typ == itemIdentifier && t.val == "stale" {
|
||||||
continue
|
continue
|
||||||
} else if t == itemEOF {
|
} else if t.typ == itemEOF {
|
||||||
break
|
break
|
||||||
} else if t != itemADD && t != itemSUB {
|
} else if t.typ != itemADD && t.typ != itemSUB {
|
||||||
p.errorf("expected next value or relative expansion in %s but got %s", ctx, t.desc())
|
p.errorf("expected next value or relative expansion in %s but got %s", ctx, t.desc())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
24
promql/testdata/staleness.test
vendored
Normal file
24
promql/testdata/staleness.test
vendored
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
load 10s
|
||||||
|
metric 0 1 stale 2
|
||||||
|
|
||||||
|
# Instant vector doesn't return series when stale.
|
||||||
|
eval instant at 10s metric
|
||||||
|
{__name__="metric"} 1
|
||||||
|
|
||||||
|
eval instant at 20s metric
|
||||||
|
|
||||||
|
eval instant at 30s metric
|
||||||
|
{__name__="metric"} 2
|
||||||
|
|
||||||
|
|
||||||
|
# Range vector ignores stale sample.
|
||||||
|
eval instant at 30s count_over_time(metric[1m])
|
||||||
|
{} 3
|
||||||
|
|
||||||
|
eval instant at 10s count_over_time(metric[1s])
|
||||||
|
{} 1
|
||||||
|
|
||||||
|
eval instant at 20s count_over_time(metric[1s])
|
||||||
|
|
||||||
|
eval instant at 20s count_over_time(metric[10s])
|
||||||
|
{} 1
|
Loading…
Reference in a new issue