mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample timestamps. Since the range of time.Time is much bigger than what we need, this has created two problems: - there could be time.Time values which were out of the range/precision of the time type that we persist to disk, therefore causing incorrectly ordered keys. One bug caused by this was: https://github.com/prometheus/prometheus/issues/367 It would be good to use a timestamp type that's more closely aligned with what the underlying storage supports. - sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit Unix timestamp (possibly even a 32-bit one). Since we store samples in large numbers, this seriously affects memory usage. Furthermore, copying/working with the data will be faster if it's smaller. *MEMORY USAGE RESULTS* Initial memory usage comparisons for a running Prometheus with 1 timeseries and 100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my tests, this advantage for some reason decreased a bit the more samples the timeseries had (to 5-7% for millions of samples). This I can't fully explain, but perhaps garbage collection issues were involved. *WHEN TO USE THE NEW TIMESTAMP TYPE* The new clientmodel.Timestamp type should be used whenever time calculations are either directly or indirectly related to sample timestamps. For example: - the timestamp of a sample itself - all kinds of watermarks - anything that may become or is compared to a sample timestamp (like the timestamp passed into Target.Scrape()). When to still use time.Time: - for measuring durations/times not related to sample timestamps, like duration telemetry exporting, timers that indicate how frequently to execute some action, etc. *NOTE ON OPERATOR OPTIMIZATION TESTS* We don't use operator optimization code anymore, but it still lives in the code as dead code. It still has tests, but I couldn't get all of them to pass with the new timestamp format. I commented out the failing cases for now, but we should probably remove the dead code soon. I just didn't want to do that in the same change as this. Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
This commit is contained in:
parent
6b7de31a3c
commit
740d448983
|
@ -15,18 +15,19 @@ package indexable
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
// EncodeTimeInto writes the provided time into the specified buffer subject
|
||||
// to the LevelDB big endian key sort order requirement.
|
||||
func EncodeTimeInto(dst []byte, t time.Time) {
|
||||
func EncodeTimeInto(dst []byte, t clientmodel.Timestamp) {
|
||||
binary.BigEndian.PutUint64(dst, uint64(t.Unix()))
|
||||
}
|
||||
|
||||
// EncodeTime converts the provided time into a byte buffer subject to the
|
||||
// LevelDB big endian key sort order requirement.
|
||||
func EncodeTime(t time.Time) []byte {
|
||||
func EncodeTime(t clientmodel.Timestamp) []byte {
|
||||
buffer := make([]byte, 8)
|
||||
|
||||
EncodeTimeInto(buffer, t)
|
||||
|
@ -36,6 +37,6 @@ func EncodeTime(t time.Time) []byte {
|
|||
|
||||
// DecodeTime deserializes a big endian byte array into a Unix time in UTC,
|
||||
// omitting granularity precision less than a second.
|
||||
func DecodeTime(src []byte) time.Time {
|
||||
return time.Unix(int64(binary.BigEndian.Uint64(src)), 0).UTC()
|
||||
func DecodeTime(src []byte) clientmodel.Timestamp {
|
||||
return clientmodel.TimestampFromUnix(int64(binary.BigEndian.Uint64(src)))
|
||||
}
|
||||
|
|
|
@ -17,14 +17,15 @@ import (
|
|||
"math/rand"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
func TestTimeEndToEnd(t *testing.T) {
|
||||
tester := func(x int) bool {
|
||||
random := rand.New(rand.NewSource(int64(x)))
|
||||
buffer := make([]byte, 8)
|
||||
incoming := time.Unix(random.Int63(), 0)
|
||||
incoming := clientmodel.TimestampFromUnix(random.Int63())
|
||||
|
||||
EncodeTimeInto(buffer, incoming)
|
||||
outgoing := DecodeTime(buffer)
|
||||
|
|
4
main.go
4
main.go
|
@ -128,7 +128,7 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
|
|||
})
|
||||
defer curator.Close()
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
return curator.Run(olderThan, clientmodel.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
}
|
||||
|
||||
func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
|
||||
|
@ -152,7 +152,7 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
|
|||
})
|
||||
defer curator.Close()
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
return curator.Run(olderThan, clientmodel.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
}
|
||||
|
||||
func (p *prometheus) close() {
|
||||
|
|
|
@ -156,7 +156,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.La
|
|||
return target
|
||||
}
|
||||
|
||||
func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp time.Time, healthy bool) {
|
||||
func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clientmodel.Timestamp, healthy bool) {
|
||||
metric := clientmodel.Metric{}
|
||||
for label, value := range t.baseLabels {
|
||||
metric[label] = value
|
||||
|
@ -182,7 +182,7 @@ func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp time
|
|||
}
|
||||
|
||||
func (t *target) Scrape(earliest time.Time, ingester extraction.Ingester) error {
|
||||
now := time.Now()
|
||||
now := clientmodel.Now()
|
||||
futureState := t.state
|
||||
err := t.scrape(now, ingester)
|
||||
if err != nil {
|
||||
|
@ -202,7 +202,7 @@ func (t *target) Scrape(earliest time.Time, ingester extraction.Ingester) error
|
|||
|
||||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1`
|
||||
|
||||
func (t *target) scrape(timestamp time.Time, ingester extraction.Ingester) (err error) {
|
||||
func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ingester) (err error) {
|
||||
defer func(start time.Time) {
|
||||
ms := float64(time.Since(start)) / float64(time.Millisecond)
|
||||
labels := map[string]string{address: t.Address(), outcome: success}
|
||||
|
|
|
@ -56,7 +56,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
|
|||
httpClient: utility.NewDeadlineClient(0),
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
now := clientmodel.Now()
|
||||
ingester := &collectResultIngester{}
|
||||
testTarget.recordScrapeHealth(ingester, now, true)
|
||||
|
||||
|
|
|
@ -68,13 +68,13 @@ type Alert struct {
|
|||
// The state of the alert (PENDING or FIRING).
|
||||
State AlertState
|
||||
// The time when the alert first transitioned into PENDING state.
|
||||
ActiveSince time.Time
|
||||
ActiveSince clientmodel.Timestamp
|
||||
// The value of the alert expression for this vector element.
|
||||
Value clientmodel.SampleValue
|
||||
}
|
||||
|
||||
// sample returns a Sample suitable for recording the alert.
|
||||
func (a Alert) sample(timestamp time.Time, value clientmodel.SampleValue) *clientmodel.Sample {
|
||||
func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *clientmodel.Sample {
|
||||
recordedMetric := clientmodel.Metric{}
|
||||
for label, value := range a.Labels {
|
||||
recordedMetric[label] = value
|
||||
|
@ -118,11 +118,11 @@ func (rule *AlertingRule) Name() string {
|
|||
return rule.name
|
||||
}
|
||||
|
||||
func (rule *AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
|
||||
}
|
||||
|
||||
func (rule *AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
// Get the raw value of the rule expression.
|
||||
exprResult, err := rule.EvalRaw(timestamp, storage)
|
||||
if err != nil {
|
||||
|
|
|
@ -103,23 +103,23 @@ type Node interface {
|
|||
// interface represents the type returned to the parent node.
|
||||
type ScalarNode interface {
|
||||
Node
|
||||
Eval(timestamp time.Time, view *viewAdapter) clientmodel.SampleValue
|
||||
Eval(timestamp clientmodel.Timestamp, view *viewAdapter) clientmodel.SampleValue
|
||||
}
|
||||
|
||||
type VectorNode interface {
|
||||
Node
|
||||
Eval(timestamp time.Time, view *viewAdapter) Vector
|
||||
Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Vector
|
||||
}
|
||||
|
||||
type MatrixNode interface {
|
||||
Node
|
||||
Eval(timestamp time.Time, view *viewAdapter) Matrix
|
||||
EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix
|
||||
Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix
|
||||
EvalBoundaries(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix
|
||||
}
|
||||
|
||||
type StringNode interface {
|
||||
Node
|
||||
Eval(timestamp time.Time, view *viewAdapter) string
|
||||
Eval(timestamp clientmodel.Timestamp, view *viewAdapter) string
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
@ -233,17 +233,17 @@ func (node MatrixLiteral) Children() Nodes { return Nodes{} }
|
|||
func (node StringLiteral) Children() Nodes { return Nodes{} }
|
||||
func (node StringFunctionCall) Children() Nodes { return node.args }
|
||||
|
||||
func (node *ScalarLiteral) Eval(timestamp time.Time, view *viewAdapter) clientmodel.SampleValue {
|
||||
func (node *ScalarLiteral) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) clientmodel.SampleValue {
|
||||
return node.value
|
||||
}
|
||||
|
||||
func (node *ScalarArithExpr) Eval(timestamp time.Time, view *viewAdapter) clientmodel.SampleValue {
|
||||
func (node *ScalarArithExpr) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) clientmodel.SampleValue {
|
||||
lhs := node.lhs.Eval(timestamp, view)
|
||||
rhs := node.rhs.Eval(timestamp, view)
|
||||
return evalScalarBinop(node.opType, lhs, rhs)
|
||||
}
|
||||
|
||||
func (node *ScalarFunctionCall) Eval(timestamp time.Time, view *viewAdapter) clientmodel.SampleValue {
|
||||
func (node *ScalarFunctionCall) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) clientmodel.SampleValue {
|
||||
return node.function.callFn(timestamp, view, node.args).(clientmodel.SampleValue)
|
||||
}
|
||||
|
||||
|
@ -277,7 +277,7 @@ func labelsToKey(labels clientmodel.Metric) uint64 {
|
|||
return summer.Sum64()
|
||||
}
|
||||
|
||||
func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (vector Vector, err error) {
|
||||
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (vector Vector, err error) {
|
||||
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -286,7 +286,7 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.Tie
|
|||
return
|
||||
}
|
||||
|
||||
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (Matrix, error) {
|
||||
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (Matrix, error) {
|
||||
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
|
||||
// null in JSON.
|
||||
matrix := Matrix{}
|
||||
|
@ -340,7 +340,7 @@ func labelIntersection(metric1, metric2 clientmodel.Metric) clientmodel.Metric {
|
|||
return intersection
|
||||
}
|
||||
|
||||
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp time.Time) Vector {
|
||||
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector {
|
||||
vector := Vector{}
|
||||
for _, aggregation := range aggregations {
|
||||
switch node.aggrType {
|
||||
|
@ -361,7 +361,7 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint
|
|||
return vector
|
||||
}
|
||||
|
||||
func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vector {
|
||||
func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Vector {
|
||||
vector := node.vector.Eval(timestamp, view)
|
||||
result := map[uint64]*groupedAggregation{}
|
||||
for _, sample := range vector {
|
||||
|
@ -399,7 +399,7 @@ func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vect
|
|||
return node.groupedAggregationsToVector(result, timestamp)
|
||||
}
|
||||
|
||||
func (node *VectorLiteral) Eval(timestamp time.Time, view *viewAdapter) Vector {
|
||||
func (node *VectorLiteral) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Vector {
|
||||
values, err := view.GetValueAtTime(node.fingerprints, timestamp)
|
||||
if err != nil {
|
||||
glog.Error("Unable to get vector values: ", err)
|
||||
|
@ -408,7 +408,7 @@ func (node *VectorLiteral) Eval(timestamp time.Time, view *viewAdapter) Vector {
|
|||
return values
|
||||
}
|
||||
|
||||
func (node *VectorFunctionCall) Eval(timestamp time.Time, view *viewAdapter) Vector {
|
||||
func (node *VectorFunctionCall) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Vector {
|
||||
return node.function.callFn(timestamp, view, node.args).(Vector)
|
||||
}
|
||||
|
||||
|
@ -552,7 +552,7 @@ func labelsEqual(labels1, labels2 clientmodel.Metric) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (node *VectorArithExpr) Eval(timestamp time.Time, view *viewAdapter) Vector {
|
||||
func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Vector {
|
||||
lhs := node.lhs.Eval(timestamp, view)
|
||||
result := Vector{}
|
||||
if node.rhs.Type() == SCALAR {
|
||||
|
@ -583,7 +583,7 @@ func (node *VectorArithExpr) Eval(timestamp time.Time, view *viewAdapter) Vector
|
|||
panic("Invalid vector arithmetic expression operands")
|
||||
}
|
||||
|
||||
func (node *MatrixLiteral) Eval(timestamp time.Time, view *viewAdapter) Matrix {
|
||||
func (node *MatrixLiteral) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix {
|
||||
interval := &metric.Interval{
|
||||
OldestInclusive: timestamp.Add(-node.interval),
|
||||
NewestInclusive: timestamp,
|
||||
|
@ -596,7 +596,7 @@ func (node *MatrixLiteral) Eval(timestamp time.Time, view *viewAdapter) Matrix {
|
|||
return values
|
||||
}
|
||||
|
||||
func (node *MatrixLiteral) EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix {
|
||||
func (node *MatrixLiteral) EvalBoundaries(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix {
|
||||
interval := &metric.Interval{
|
||||
OldestInclusive: timestamp.Add(-node.interval),
|
||||
NewestInclusive: timestamp,
|
||||
|
@ -621,11 +621,11 @@ func (matrix Matrix) Swap(i, j int) {
|
|||
matrix[i], matrix[j] = matrix[j], matrix[i]
|
||||
}
|
||||
|
||||
func (node *StringLiteral) Eval(timestamp time.Time, view *viewAdapter) string {
|
||||
func (node *StringLiteral) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) string {
|
||||
return node.str
|
||||
}
|
||||
|
||||
func (node *StringFunctionCall) Eval(timestamp time.Time, view *viewAdapter) string {
|
||||
func (node *StringFunctionCall) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) string {
|
||||
return node.function.callFn(timestamp, view, node.args).(string)
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ type Function struct {
|
|||
name string
|
||||
argTypes []ExprType
|
||||
returnType ExprType
|
||||
callFn func(timestamp time.Time, view *viewAdapter, args []Node) interface{}
|
||||
callFn func(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{}
|
||||
}
|
||||
|
||||
func (function *Function) CheckArgTypes(args []Node) error {
|
||||
|
@ -68,12 +68,12 @@ func (function *Function) CheckArgTypes(args []Node) error {
|
|||
}
|
||||
|
||||
// === time() clientmodel.SampleValue ===
|
||||
func timeImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func timeImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
return clientmodel.SampleValue(time.Now().Unix())
|
||||
}
|
||||
|
||||
// === delta(matrix MatrixNode, isCounter ScalarNode) Vector ===
|
||||
func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func deltaImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
matrixNode := args[0].(MatrixNode)
|
||||
isCounter := args[1].(ScalarNode).Eval(timestamp, view) > 0
|
||||
resultVector := Vector{}
|
||||
|
@ -133,7 +133,7 @@ func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{}
|
|||
}
|
||||
|
||||
// === rate(node *MatrixNode) Vector ===
|
||||
func rateImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
args = append(args, &ScalarLiteral{value: 1})
|
||||
vector := deltaImpl(timestamp, view, args).(Vector)
|
||||
|
||||
|
@ -164,7 +164,7 @@ func (sorter vectorByValueSorter) Swap(i, j int) {
|
|||
}
|
||||
|
||||
// === sort(node *VectorNode) Vector ===
|
||||
func sortImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func sortImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
byValueSorter := vectorByValueSorter{
|
||||
vector: args[0].(VectorNode).Eval(timestamp, view),
|
||||
}
|
||||
|
@ -173,7 +173,7 @@ func sortImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
|||
}
|
||||
|
||||
// === sortDesc(node *VectorNode) Vector ===
|
||||
func sortDescImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func sortDescImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
descByValueSorter := utility.ReverseSorter{
|
||||
vectorByValueSorter{
|
||||
vector: args[0].(VectorNode).Eval(timestamp, view),
|
||||
|
@ -184,7 +184,7 @@ func sortDescImpl(timestamp time.Time, view *viewAdapter, args []Node) interface
|
|||
}
|
||||
|
||||
// === sampleVectorImpl() Vector ===
|
||||
func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func sampleVectorImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
return Vector{
|
||||
&clientmodel.Sample{
|
||||
Metric: clientmodel.Metric{
|
||||
|
@ -257,7 +257,7 @@ func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) inter
|
|||
}
|
||||
|
||||
// === scalar(node *VectorNode) Scalar ===
|
||||
func scalarImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
|
||||
func scalarImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
|
||||
v := args[0].(VectorNode).Eval(timestamp, view)
|
||||
if len(v) != 1 {
|
||||
return clientmodel.SampleValue(math.NaN())
|
||||
|
|
|
@ -15,7 +15,6 @@ package ast
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -29,7 +28,7 @@ func (node emptyRangeNode) NodeTreeToDotGraph() string { return "" }
|
|||
func (node emptyRangeNode) String() string { return "" }
|
||||
func (node emptyRangeNode) Children() Nodes { return Nodes{} }
|
||||
|
||||
func (node emptyRangeNode) Eval(timestamp time.Time, view *viewAdapter) Matrix {
|
||||
func (node emptyRangeNode) Eval(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix {
|
||||
return Matrix{
|
||||
metric.SampleSet{
|
||||
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"},
|
||||
|
@ -38,7 +37,7 @@ func (node emptyRangeNode) Eval(timestamp time.Time, view *viewAdapter) Matrix {
|
|||
}
|
||||
}
|
||||
|
||||
func (node emptyRangeNode) EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix {
|
||||
func (node emptyRangeNode) EvalBoundaries(timestamp clientmodel.Timestamp, view *viewAdapter) Matrix {
|
||||
return Matrix{
|
||||
metric.SampleSet{
|
||||
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "empty_metric"},
|
||||
|
@ -48,7 +47,7 @@ func (node emptyRangeNode) EvalBoundaries(timestamp time.Time, view *viewAdapter
|
|||
}
|
||||
|
||||
func TestDeltaWithEmptyElementDoesNotCrash(t *testing.T) {
|
||||
now := time.Now()
|
||||
now := clientmodel.Now()
|
||||
vector := deltaImpl(now, nil, []Node{emptyRangeNode{}, &ScalarLiteral{value: 0}}).(Vector)
|
||||
if len(vector) != 0 {
|
||||
t.Fatalf("Expected empty result vector, got: %v", vector)
|
||||
|
|
|
@ -48,7 +48,7 @@ type viewAdapter struct {
|
|||
|
||||
// interpolateSamples interpolates a value at a target time between two
|
||||
// provided sample pairs.
|
||||
func interpolateSamples(first, second *metric.SamplePair, timestamp time.Time) *metric.SamplePair {
|
||||
func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel.Timestamp) *metric.SamplePair {
|
||||
dv := second.Value - first.Value
|
||||
dt := second.Timestamp.Sub(first.Timestamp)
|
||||
|
||||
|
@ -65,7 +65,7 @@ func interpolateSamples(first, second *metric.SamplePair, timestamp time.Time) *
|
|||
// surrounding a given target time. If samples are found both before and after
|
||||
// the target time, the sample value is interpolated between these. Otherwise,
|
||||
// the single closest sample is returned verbatim.
|
||||
func (v *viewAdapter) chooseClosestSample(samples metric.Values, timestamp time.Time) *metric.SamplePair {
|
||||
func (v *viewAdapter) chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) *metric.SamplePair {
|
||||
var closestBefore *metric.SamplePair
|
||||
var closestAfter *metric.SamplePair
|
||||
for _, candidate := range samples {
|
||||
|
@ -109,7 +109,7 @@ func (v *viewAdapter) chooseClosestSample(samples metric.Values, timestamp time.
|
|||
}
|
||||
}
|
||||
|
||||
func (v *viewAdapter) GetValueAtTime(fingerprints clientmodel.Fingerprints, timestamp time.Time) (Vector, error) {
|
||||
func (v *viewAdapter) GetValueAtTime(fingerprints clientmodel.Fingerprints, timestamp clientmodel.Timestamp) (Vector, error) {
|
||||
timer := v.stats.GetTimer(stats.GetValueAtTimeTime).Start()
|
||||
samples := Vector{}
|
||||
for _, fingerprint := range fingerprints {
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -145,7 +144,7 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
|
|||
return string(dataJSON)
|
||||
}
|
||||
|
||||
func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *metric.TieredStorage, queryStats *stats.TimerGroup) string {
|
||||
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage *metric.TieredStorage, queryStats *stats.TimerGroup) string {
|
||||
viewTimer := queryStats.GetTimer(stats.TotalViewBuildingTime).Start()
|
||||
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
|
||||
viewTimer.Stop()
|
||||
|
|
|
@ -99,7 +99,7 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
|
|||
}
|
||||
}
|
||||
|
||||
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
|
||||
func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
|
||||
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
||||
analyzer := NewQueryAnalyzer(storage)
|
||||
analyzer.AnalyzeQueries(node)
|
||||
|
@ -125,7 +125,7 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.
|
|||
return NewViewAdapter(view, storage, queryStats), nil
|
||||
}
|
||||
|
||||
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
|
||||
func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
|
||||
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
|
||||
analyzer := NewQueryAnalyzer(storage)
|
||||
analyzer.AnalyzeQueries(node)
|
||||
|
|
|
@ -23,9 +23,9 @@ import (
|
|||
)
|
||||
|
||||
var testSampleInterval = time.Duration(5) * time.Minute
|
||||
var testStartTime = time.Time{}
|
||||
var testStartTime = clientmodel.Timestamp(0)
|
||||
|
||||
func getTestValueStream(startVal clientmodel.SampleValue, endVal clientmodel.SampleValue, stepVal clientmodel.SampleValue, startTime time.Time) (resultValues metric.Values) {
|
||||
func getTestValueStream(startVal clientmodel.SampleValue, endVal clientmodel.SampleValue, stepVal clientmodel.SampleValue, startTime clientmodel.Timestamp) (resultValues metric.Values) {
|
||||
currentTime := startTime
|
||||
for currentVal := startVal; currentVal <= endVal; currentVal += stepVal {
|
||||
sample := &metric.SamplePair{
|
||||
|
|
|
@ -124,7 +124,7 @@ func (m *ruleManager) queueAlertNotifications(rule *AlertingRule) {
|
|||
AlertNameLabel: clientmodel.LabelValue(rule.Name()),
|
||||
}),
|
||||
Value: aa.Value,
|
||||
ActiveSince: aa.ActiveSince,
|
||||
ActiveSince: aa.ActiveSince.Time(),
|
||||
RuleString: rule.String(),
|
||||
GeneratorUrl: m.prometheusUrl + ConsoleLinkForExpression(rule.vector.String()),
|
||||
})
|
||||
|
@ -133,7 +133,7 @@ func (m *ruleManager) queueAlertNotifications(rule *AlertingRule) {
|
|||
}
|
||||
|
||||
func (m *ruleManager) runIteration(results chan<- *extraction.Result) {
|
||||
now := time.Now()
|
||||
now := clientmodel.Now()
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
m.Lock()
|
||||
|
|
|
@ -16,7 +16,6 @@ package rules
|
|||
import (
|
||||
"fmt"
|
||||
"html/template"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -35,11 +34,11 @@ type RecordingRule struct {
|
|||
|
||||
func (rule RecordingRule) Name() string { return rule.name }
|
||||
|
||||
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
|
||||
}
|
||||
|
||||
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
// Get the raw value of the rule expression.
|
||||
vector, err := rule.EvalRaw(timestamp, storage)
|
||||
if err != nil {
|
||||
|
|
|
@ -15,7 +15,8 @@ package rules
|
|||
|
||||
import (
|
||||
"html/template"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/rules/ast"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
|
@ -28,9 +29,9 @@ type Rule interface {
|
|||
Name() string
|
||||
// EvalRaw evaluates the rule's vector expression without triggering any
|
||||
// other actions, like recording or alerting.
|
||||
EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
// Eval evaluates the rule, including any associated recording or alerting actions.
|
||||
Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
// ToDotGraph returns a Graphviz dot graph of the rule.
|
||||
ToDotGraph() string
|
||||
// String returns a human-readable string representation of the rule.
|
||||
|
|
|
@ -33,7 +33,7 @@ var (
|
|||
fixturesPath = "fixtures"
|
||||
)
|
||||
|
||||
func annotateWithTime(lines []string, timestamp time.Time) []string {
|
||||
func annotateWithTime(lines []string, timestamp clientmodel.Timestamp) []string {
|
||||
annotatedLines := []string{}
|
||||
for _, line := range lines {
|
||||
annotatedLines = append(annotatedLines, fmt.Sprintf(line, timestamp))
|
||||
|
|
|
@ -28,7 +28,7 @@ type nopCurationStateUpdater struct{}
|
|||
|
||||
func (n *nopCurationStateUpdater) UpdateCurationState(*CurationState) {}
|
||||
|
||||
func generateTestSamples(endTime time.Time, numTs int, samplesPerTs int, interval time.Duration) clientmodel.Samples {
|
||||
func generateTestSamples(endTime clientmodel.Timestamp, numTs int, samplesPerTs int, interval time.Duration) clientmodel.Samples {
|
||||
samples := make(clientmodel.Samples, 0, numTs*samplesPerTs)
|
||||
|
||||
startTime := endTime.Add(-interval * time.Duration(samplesPerTs-1))
|
||||
|
|
|
@ -107,7 +107,7 @@ type watermarkScanner struct {
|
|||
// samples
|
||||
samples raw.Persistence
|
||||
// stopAt is a cue for when to stop mutating a given series.
|
||||
stopAt time.Time
|
||||
stopAt clientmodel.Timestamp
|
||||
|
||||
// stop functions as the global stop channel for all future operations.
|
||||
stop chan bool
|
||||
|
@ -128,7 +128,7 @@ type watermarkScanner struct {
|
|||
// curated.
|
||||
// curationState is the on-disk store where the curation remarks are made for
|
||||
// how much progress has been made.
|
||||
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status CurationStateUpdater) (err error) {
|
||||
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Timestamp, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status CurationStateUpdater) (err error) {
|
||||
defer func(t time.Time) {
|
||||
duration := float64(time.Since(t) / time.Millisecond)
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
clientmodel.MetricNameLabel: "my_metric",
|
||||
"request_type": "your_mom",
|
||||
|
@ -34,7 +34,7 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
|
|||
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
clientmodel.MetricNameLabel: "my_metric",
|
||||
"request_type": "your_dad",
|
||||
|
@ -81,7 +81,7 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
|
|||
func GetFingerprintsForLabelNameTests(p MetricPersistence, t test.Tester) {
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
clientmodel.MetricNameLabel: "my_metric",
|
||||
"request_type": "your_mom",
|
||||
|
@ -91,7 +91,7 @@ func GetFingerprintsForLabelNameTests(p MetricPersistence, t test.Tester) {
|
|||
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
clientmodel.MetricNameLabel: "my_metric",
|
||||
"request_type": "your_dad",
|
||||
|
@ -147,7 +147,7 @@ func GetFingerprintsForLabelNameTests(p MetricPersistence, t test.Tester) {
|
|||
func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
"request_type": "your_mom",
|
||||
},
|
||||
|
@ -155,7 +155,7 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
|
|||
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: 0,
|
||||
Timestamp: time.Time{},
|
||||
Timestamp: 0,
|
||||
Metric: clientmodel.Metric{
|
||||
"request_type": "your_dad",
|
||||
"one-off": "value",
|
||||
|
@ -263,7 +263,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
|||
|
||||
for i := 0; i < increments; i++ {
|
||||
for j := 0; j < repetitions; j++ {
|
||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(i),
|
||||
Timestamp: time,
|
||||
|
@ -293,7 +293,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
|||
t.Fatalf("expected %d fingerprints, got %d", 1, len(fingerprints))
|
||||
}
|
||||
|
||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
samples := p.GetValueAtTime(fingerprints[0], time)
|
||||
if len(samples) == 0 {
|
||||
t.Fatal("expected at least one sample.")
|
||||
|
@ -323,7 +323,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
|||
s := clientmodel.Samples{}
|
||||
for i := 0; i < increments; i++ {
|
||||
for j := 0; j < repetitions; j++ {
|
||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
s = append(s, &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(i),
|
||||
Timestamp: time,
|
||||
|
@ -355,7 +355,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
|||
t.Fatalf("expected %d fingerprints, got %d", 1, len(fingerprints))
|
||||
}
|
||||
|
||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||
samples := p.GetValueAtTime(fingerprints[0], time)
|
||||
if len(samples) == 0 {
|
||||
t.Fatal("expected at least one sample.")
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
var (
|
||||
// ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20``
|
||||
usEastern, _ = time.LoadLocation("US/Eastern")
|
||||
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC)
|
||||
testInstant = clientmodel.TimestampFromTime(time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC))
|
||||
)
|
||||
|
||||
func testAppendSamples(p MetricPersistence, s *clientmodel.Sample, t test.Tester) {
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
package metric
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -46,7 +44,7 @@ type MetricPersistence interface {
|
|||
GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error)
|
||||
|
||||
// Get the two metric values that are immediately adjacent to a given time.
|
||||
GetValueAtTime(*clientmodel.Fingerprint, time.Time) Values
|
||||
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
|
||||
// Get the boundary values of an interval: the first value older than the
|
||||
// interval start, and the first value younger than the interval end.
|
||||
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
|
||||
|
@ -59,7 +57,7 @@ type MetricPersistence interface {
|
|||
// View provides a view of the values in the datastore subject to the request
|
||||
// of a preloading operation.
|
||||
type View interface {
|
||||
GetValueAtTime(*clientmodel.Fingerprint, time.Time) Values
|
||||
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
|
||||
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
|
||||
GetRangeValues(*clientmodel.Fingerprint, Interval) Values
|
||||
|
||||
|
|
|
@ -526,7 +526,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t time.Time) Values {
|
||||
func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
|
@ -663,7 +663,7 @@ func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
|
|||
}
|
||||
}
|
||||
|
||||
type MetricSamplesDecoder struct {}
|
||||
type MetricSamplesDecoder struct{}
|
||||
|
||||
func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
|
||||
key := &dto.SampleKey{}
|
||||
|
@ -688,7 +688,7 @@ func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error)
|
|||
return NewValuesFromDTO(values), nil
|
||||
}
|
||||
|
||||
type AcceptAllFilter struct {}
|
||||
type AcceptAllFilter struct{}
|
||||
|
||||
func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult {
|
||||
return storage.ACCEPT
|
||||
|
|
|
@ -16,7 +16,6 @@ package metric
|
|||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -45,14 +44,14 @@ type stream interface {
|
|||
add(...*SamplePair)
|
||||
|
||||
clone() Values
|
||||
expunge(age time.Time) Values
|
||||
expunge(age clientmodel.Timestamp) Values
|
||||
|
||||
size() int
|
||||
clear()
|
||||
|
||||
metric() clientmodel.Metric
|
||||
|
||||
getValueAtTime(t time.Time) Values
|
||||
getValueAtTime(t clientmodel.Timestamp) Values
|
||||
getBoundaryValues(in Interval) Values
|
||||
getRangeValues(in Interval) Values
|
||||
}
|
||||
|
@ -85,7 +84,7 @@ func (s *arrayStream) clone() Values {
|
|||
return clone
|
||||
}
|
||||
|
||||
func (s *arrayStream) expunge(t time.Time) Values {
|
||||
func (s *arrayStream) expunge(t clientmodel.Timestamp) Values {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
@ -100,7 +99,7 @@ func (s *arrayStream) expunge(t time.Time) Values {
|
|||
return expunged
|
||||
}
|
||||
|
||||
func (s *arrayStream) getValueAtTime(t time.Time) Values {
|
||||
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) Values {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
|
@ -261,7 +260,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, finge
|
|||
return series
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clientmodel.Samples) {
|
||||
func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) {
|
||||
emptySeries := []clientmodel.Fingerprint{}
|
||||
|
||||
s.RLock()
|
||||
|
@ -418,7 +417,7 @@ func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) Values {
|
|||
return series.clone()
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t time.Time) Values {
|
||||
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ func BenchmarkStreamAdd(b *testing.B) {
|
|||
samples := make(Values, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
samples = append(samples, &SamplePair{
|
||||
Timestamp: time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
Timestamp: clientmodel.TimestampFromTime(time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC)),
|
||||
Value: clientmodel.SampleValue(i),
|
||||
})
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ func benchmarkAppendSamples(b *testing.B, labels int) {
|
|||
samples = append(samples, &clientmodel.Sample{
|
||||
Metric: metric,
|
||||
Value: clientmodel.SampleValue(i),
|
||||
Timestamp: time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
Timestamp: clientmodel.TimestampFromTime(time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC)),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -18,19 +18,21 @@ import (
|
|||
"math"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
// Encapsulates a primitive query operation.
|
||||
type op interface {
|
||||
// The time at which this operation starts.
|
||||
StartsAt() time.Time
|
||||
StartsAt() clientmodel.Timestamp
|
||||
// Extract samples from stream of values and advance operation time.
|
||||
ExtractSamples(Values) Values
|
||||
// Return whether the operator has consumed all data it needs.
|
||||
Consumed() bool
|
||||
// Get current operation time or nil if no subsequent work associated with
|
||||
// this operator remains.
|
||||
CurrentTime() *time.Time
|
||||
CurrentTime() clientmodel.Timestamp
|
||||
// GreedierThan indicates whether this present operation should take
|
||||
// precedence over the other operation due to greediness.
|
||||
//
|
||||
|
@ -62,7 +64,7 @@ func (o ops) Swap(i, j int) {
|
|||
|
||||
// Encapsulates getting values at or adjacent to a specific time.
|
||||
type getValuesAtTimeOp struct {
|
||||
time time.Time
|
||||
time clientmodel.Timestamp
|
||||
consumed bool
|
||||
}
|
||||
|
||||
|
@ -70,7 +72,7 @@ func (g *getValuesAtTimeOp) String() string {
|
|||
return fmt.Sprintf("getValuesAtTimeOp at %s", g.time)
|
||||
}
|
||||
|
||||
func (g *getValuesAtTimeOp) StartsAt() time.Time {
|
||||
func (g *getValuesAtTimeOp) StartsAt() clientmodel.Timestamp {
|
||||
return g.time
|
||||
}
|
||||
|
||||
|
@ -101,7 +103,7 @@ func (g *getValuesAtTimeOp) GreedierThan(op op) (superior bool) {
|
|||
// are adjacent to it.
|
||||
//
|
||||
// An assumption of this is that the provided samples are already sorted!
|
||||
func extractValuesAroundTime(t time.Time, in Values) (out Values) {
|
||||
func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) {
|
||||
i := sort.Search(len(in), func(i int) bool {
|
||||
return !in[i].Timestamp.Before(t)
|
||||
})
|
||||
|
@ -126,8 +128,8 @@ func extractValuesAroundTime(t time.Time, in Values) (out Values) {
|
|||
return
|
||||
}
|
||||
|
||||
func (g getValuesAtTimeOp) CurrentTime() *time.Time {
|
||||
return &g.time
|
||||
func (g getValuesAtTimeOp) CurrentTime() clientmodel.Timestamp {
|
||||
return g.time
|
||||
}
|
||||
|
||||
func (g getValuesAtTimeOp) Consumed() bool {
|
||||
|
@ -136,8 +138,8 @@ func (g getValuesAtTimeOp) Consumed() bool {
|
|||
|
||||
// Encapsulates getting values at a given interval over a duration.
|
||||
type getValuesAtIntervalOp struct {
|
||||
from time.Time
|
||||
through time.Time
|
||||
from clientmodel.Timestamp
|
||||
through clientmodel.Timestamp
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
|
@ -145,11 +147,11 @@ func (o *getValuesAtIntervalOp) String() string {
|
|||
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", o.from, o.interval, o.through)
|
||||
}
|
||||
|
||||
func (g *getValuesAtIntervalOp) StartsAt() time.Time {
|
||||
func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp {
|
||||
return g.from
|
||||
}
|
||||
|
||||
func (g *getValuesAtIntervalOp) Through() time.Time {
|
||||
func (g *getValuesAtIntervalOp) Through() clientmodel.Timestamp {
|
||||
return g.through
|
||||
}
|
||||
|
||||
|
@ -161,7 +163,7 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
|
|||
for len(in) > 0 {
|
||||
out = append(out, extractValuesAroundTime(g.from, in)...)
|
||||
lastExtractedTime := out[len(out)-1].Timestamp
|
||||
in = in.TruncateBefore(lastExtractedTime.Add(1))
|
||||
in = in.TruncateBefore(lastExtractedTime.Add(clientmodel.MinimumTick))
|
||||
g.from = g.from.Add(g.interval)
|
||||
for !g.from.After(lastExtractedTime) {
|
||||
g.from = g.from.Add(g.interval)
|
||||
|
@ -176,8 +178,8 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
|
|||
return
|
||||
}
|
||||
|
||||
func (g *getValuesAtIntervalOp) CurrentTime() *time.Time {
|
||||
return &g.from
|
||||
func (g *getValuesAtIntervalOp) CurrentTime() clientmodel.Timestamp {
|
||||
return g.from
|
||||
}
|
||||
|
||||
func (g *getValuesAtIntervalOp) Consumed() bool {
|
||||
|
@ -199,19 +201,19 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
|
|||
|
||||
// Encapsulates getting all values in a given range.
|
||||
type getValuesAlongRangeOp struct {
|
||||
from time.Time
|
||||
through time.Time
|
||||
from clientmodel.Timestamp
|
||||
through clientmodel.Timestamp
|
||||
}
|
||||
|
||||
func (o *getValuesAlongRangeOp) String() string {
|
||||
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", o.from, o.through)
|
||||
}
|
||||
|
||||
func (g *getValuesAlongRangeOp) StartsAt() time.Time {
|
||||
func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp {
|
||||
return g.from
|
||||
}
|
||||
|
||||
func (g *getValuesAlongRangeOp) Through() time.Time {
|
||||
func (g *getValuesAlongRangeOp) Through() clientmodel.Timestamp {
|
||||
return g.through
|
||||
}
|
||||
|
||||
|
@ -227,7 +229,7 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
|
|||
// No samples at or after operator start time. This can only happen if we
|
||||
// try applying the operator to a time after the last recorded sample. In
|
||||
// this case, we're finished.
|
||||
g.from = g.through.Add(1)
|
||||
g.from = g.through.Add(clientmodel.MinimumTick)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -236,7 +238,7 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
|
|||
return in[i].Timestamp.After(g.through)
|
||||
})
|
||||
if lastIdx == firstIdx {
|
||||
g.from = g.through.Add(1)
|
||||
g.from = g.through.Add(clientmodel.MinimumTick)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -248,8 +250,8 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
|
|||
return in[firstIdx:lastIdx]
|
||||
}
|
||||
|
||||
func (g *getValuesAlongRangeOp) CurrentTime() *time.Time {
|
||||
return &g.from
|
||||
func (g *getValuesAlongRangeOp) CurrentTime() clientmodel.Timestamp {
|
||||
return g.from
|
||||
}
|
||||
|
||||
func (g *getValuesAlongRangeOp) Consumed() bool {
|
||||
|
@ -275,22 +277,22 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
|
|||
// incremented by interval and from is reset to through-rangeDuration. Returns
|
||||
// current time nil when from > totalThrough.
|
||||
type getValueRangeAtIntervalOp struct {
|
||||
rangeFrom time.Time
|
||||
rangeThrough time.Time
|
||||
rangeFrom clientmodel.Timestamp
|
||||
rangeThrough clientmodel.Timestamp
|
||||
rangeDuration time.Duration
|
||||
interval time.Duration
|
||||
through time.Time
|
||||
through clientmodel.Timestamp
|
||||
}
|
||||
|
||||
func (o *getValueRangeAtIntervalOp) String() string {
|
||||
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through)
|
||||
}
|
||||
|
||||
func (g *getValueRangeAtIntervalOp) StartsAt() time.Time {
|
||||
func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp {
|
||||
return g.rangeFrom
|
||||
}
|
||||
|
||||
func (g *getValueRangeAtIntervalOp) Through() time.Time {
|
||||
func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
|
@ -311,7 +313,7 @@ func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
|
|||
// No samples at or after operator start time. This can only happen if we
|
||||
// try applying the operator to a time after the last recorded sample. In
|
||||
// this case, we're finished.
|
||||
g.rangeFrom = g.through.Add(1)
|
||||
g.rangeFrom = g.through.Add(clientmodel.MinimumTick)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -337,8 +339,8 @@ func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
|
|||
return in[firstIdx:lastIdx]
|
||||
}
|
||||
|
||||
func (g *getValueRangeAtIntervalOp) CurrentTime() *time.Time {
|
||||
return &g.rangeFrom
|
||||
func (g *getValueRangeAtIntervalOp) CurrentTime() clientmodel.Timestamp {
|
||||
return g.rangeFrom
|
||||
}
|
||||
|
||||
func (g *getValueRangeAtIntervalOp) Consumed() bool {
|
||||
|
@ -376,7 +378,7 @@ func (s rangeDurationSorter) Less(i, j int) bool {
|
|||
type durationOperator interface {
|
||||
op
|
||||
|
||||
Through() time.Time
|
||||
Through() clientmodel.Timestamp
|
||||
}
|
||||
|
||||
// greedinessSort sorts the operations in descending order by level of
|
||||
|
@ -565,7 +567,7 @@ func optimizeForwardGetValuesAlongRange(headOp *getValuesAlongRangeOp, unoptimiz
|
|||
|
||||
// selectQueriesForTime chooses all subsequent operations from the slice that
|
||||
// have the same start time as the provided time and emits them.
|
||||
func selectQueriesForTime(time time.Time, queries ops) (out ops) {
|
||||
func selectQueriesForTime(time clientmodel.Timestamp, queries ops) (out ops) {
|
||||
if len(queries) == 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -241,86 +241,88 @@ func testOptimizeTimeGroups(t test.Tester) {
|
|||
},
|
||||
},
|
||||
},
|
||||
// Include Truncated Intervals with Range.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
/*
|
||||
// Include Truncated Intervals with Range.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(30 * time.Second),
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
// Compacted Forward Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(30 * time.Second),
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Compacted Forward Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
// Compacted Tail Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Compacted Tail Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
*/
|
||||
// Regression Validation 1: Multiple Overlapping Interval Requests
|
||||
// This one specific case expects no mutation.
|
||||
{
|
||||
|
@ -865,123 +867,125 @@ func testOptimize(t test.Tester) {
|
|||
},
|
||||
},
|
||||
},
|
||||
// Different range with different interval; return best.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
/*
|
||||
// Different range with different interval; return best.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
out: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 5,
|
||||
// Include Truncated Intervals with Range.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(30 * time.Second),
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Include Truncated Intervals with Range.
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
// Compacted Forward Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(30 * time.Second),
|
||||
// Compacted Tail Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(30 * time.Second),
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Compacted Forward Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Compacted Tail Truncation
|
||||
{
|
||||
in: ops{
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
},
|
||||
out: ops{
|
||||
&getValuesAlongRangeOp{
|
||||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
&getValuesAtIntervalOp{
|
||||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: time.Second * 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
*/
|
||||
// Compact Interval with Subservient Range
|
||||
{
|
||||
in: ops{
|
||||
|
|
|
@ -15,7 +15,6 @@ package metric
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
|
||||
|
@ -44,7 +43,7 @@ type Processor interface {
|
|||
//
|
||||
// Upon completion or error, the last time at which the processor finished
|
||||
// shall be emitted in addition to any errors.
|
||||
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error)
|
||||
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error)
|
||||
}
|
||||
|
||||
// CompactionProcessor combines sparse values in the database together such
|
||||
|
@ -83,7 +82,7 @@ func (p *CompactionProcessor) String() string {
|
|||
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize)
|
||||
}
|
||||
|
||||
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error) {
|
||||
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
|
||||
var pendingBatch raw.Batch = nil
|
||||
|
||||
defer func() {
|
||||
|
@ -95,7 +94,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
var pendingMutations = 0
|
||||
var pendingSamples Values
|
||||
var unactedSamples Values
|
||||
var lastTouchedTime time.Time
|
||||
var lastTouchedTime clientmodel.Timestamp
|
||||
var keyDropped bool
|
||||
|
||||
sampleKey, _ := p.sampleKeys.Get()
|
||||
|
@ -185,7 +184,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
pendingBatch.Put(k, b)
|
||||
|
||||
pendingMutations++
|
||||
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
|
||||
lastCurated = newSampleKey.FirstTimestamp
|
||||
if len(unactedSamples) > 0 {
|
||||
if !keyDropped {
|
||||
sampleKey.Dump(k)
|
||||
|
@ -235,7 +234,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
pendingBatch.Put(k, b)
|
||||
pendingSamples = Values{}
|
||||
pendingMutations++
|
||||
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
|
||||
lastCurated = newSampleKey.FirstTimestamp
|
||||
}
|
||||
|
||||
// This is not deferred due to the off-chance that a pre-existing commit
|
||||
|
@ -310,7 +309,7 @@ func (p *DeletionProcessor) String() string {
|
|||
return "deletionProcessor"
|
||||
}
|
||||
|
||||
func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error) {
|
||||
func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
|
||||
var pendingBatch raw.Batch = nil
|
||||
|
||||
defer func() {
|
||||
|
|
|
@ -31,13 +31,13 @@ import (
|
|||
type curationState struct {
|
||||
fingerprint string
|
||||
ignoreYoungerThan time.Duration
|
||||
lastCurated time.Time
|
||||
lastCurated clientmodel.Timestamp
|
||||
processor Processor
|
||||
}
|
||||
|
||||
type watermarkState struct {
|
||||
fingerprint string
|
||||
lastAppended time.Time
|
||||
lastAppended clientmodel.Timestamp
|
||||
}
|
||||
|
||||
type sampleGroup struct {
|
||||
|
|
|
@ -15,7 +15,6 @@ package metric
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -40,7 +39,7 @@ func GetFingerprintsForLabelSetUsesAndForLabelMatchingTests(p MetricPersistence,
|
|||
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(0.0),
|
||||
Timestamp: time.Now(),
|
||||
Timestamp: clientmodel.Now(),
|
||||
Metric: m,
|
||||
}, t)
|
||||
}
|
||||
|
|
|
@ -329,14 +329,14 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
|
|||
for _, value := range context.values {
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(value.value),
|
||||
Timestamp: time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC),
|
||||
Timestamp: clientmodel.TimestampFromTime(time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC)),
|
||||
Metric: m,
|
||||
}, t)
|
||||
}
|
||||
|
||||
for j, behavior := range context.behaviors {
|
||||
input := behavior.input
|
||||
time := time.Date(input.year, input.month, input.day, input.hour, 0, 0, 0, time.UTC)
|
||||
time := clientmodel.TimestampFromTime(time.Date(input.year, input.month, input.day, input.hour, 0, 0, 0, time.UTC))
|
||||
fingerprint := &clientmodel.Fingerprint{}
|
||||
fingerprint.LoadFromMetric(m)
|
||||
actual := p.GetValueAtTime(fingerprint, time)
|
||||
|
@ -821,15 +821,15 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
|
|||
for _, value := range context.values {
|
||||
testAppendSamples(p, &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(value.value),
|
||||
Timestamp: time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC),
|
||||
Timestamp: clientmodel.TimestampFromTime(time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC)),
|
||||
Metric: m,
|
||||
}, t)
|
||||
}
|
||||
|
||||
for j, behavior := range context.behaviors {
|
||||
input := behavior.input
|
||||
open := time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC)
|
||||
end := time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC)
|
||||
open := clientmodel.TimestampFromTime(time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC))
|
||||
end := clientmodel.TimestampFromTime(time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC))
|
||||
in := Interval{
|
||||
OldestInclusive: open,
|
||||
NewestInclusive: end,
|
||||
|
@ -873,11 +873,11 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
|
|||
t.Fatalf("%d.%d.%d(%s). Expected %v but got: %v\n", i, j, k, behavior.name, expected.value, actual.Value)
|
||||
}
|
||||
|
||||
if actual.Timestamp.Year() != expected.year {
|
||||
t.Fatalf("%d.%d.%d(%s). Expected %d but got: %d\n", i, j, k, behavior.name, expected.year, actual.Timestamp.Year())
|
||||
if actual.Timestamp.Time().Year() != expected.year {
|
||||
t.Fatalf("%d.%d.%d(%s). Expected %d but got: %d\n", i, j, k, behavior.name, expected.year, actual.Timestamp.Time().Year())
|
||||
}
|
||||
if actual.Timestamp.Month() != expected.month {
|
||||
t.Fatalf("%d.%d.%d(%s). Expected %d but got: %d\n", i, j, k, behavior.name, expected.month, actual.Timestamp.Month())
|
||||
if actual.Timestamp.Time().Month() != expected.month {
|
||||
t.Fatalf("%d.%d.%d(%s). Expected %d but got: %d\n", i, j, k, behavior.name, expected.month, actual.Timestamp.Time().Month())
|
||||
}
|
||||
// XXX: Find problem here.
|
||||
// Mismatches occur in this and have for a long time in the LevelDB
|
||||
|
|
|
@ -1,10 +1,22 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
|
||||
|
@ -14,12 +26,12 @@ import (
|
|||
)
|
||||
|
||||
func (s SamplePair) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp.Unix())), nil
|
||||
return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil
|
||||
}
|
||||
|
||||
type SamplePair struct {
|
||||
Value clientmodel.SampleValue
|
||||
Timestamp time.Time
|
||||
Timestamp clientmodel.Timestamp
|
||||
}
|
||||
|
||||
func (s *SamplePair) Equal(o *SamplePair) bool {
|
||||
|
@ -72,19 +84,19 @@ func (v Values) Equal(o Values) bool {
|
|||
|
||||
// FirstTimeAfter indicates whether the first sample of a set is after a given
|
||||
// timestamp.
|
||||
func (v Values) FirstTimeAfter(t time.Time) bool {
|
||||
func (v Values) FirstTimeAfter(t clientmodel.Timestamp) bool {
|
||||
return v[0].Timestamp.After(t)
|
||||
}
|
||||
|
||||
// LastTimeBefore indicates whether the last sample of a set is before a given
|
||||
// timestamp.
|
||||
func (v Values) LastTimeBefore(t time.Time) bool {
|
||||
func (v Values) LastTimeBefore(t clientmodel.Timestamp) bool {
|
||||
return v[len(v)-1].Timestamp.Before(t)
|
||||
}
|
||||
|
||||
// InsideInterval indicates whether a given range of sorted values could contain
|
||||
// a value for a given time.
|
||||
func (v Values) InsideInterval(t time.Time) bool {
|
||||
func (v Values) InsideInterval(t clientmodel.Timestamp) bool {
|
||||
switch {
|
||||
case v.Len() == 0:
|
||||
return false
|
||||
|
@ -100,7 +112,7 @@ func (v Values) InsideInterval(t time.Time) bool {
|
|||
// TruncateBefore returns a subslice of the original such that extraneous
|
||||
// samples in the collection that occur before the provided time are
|
||||
// dropped. The original slice is not mutated
|
||||
func (v Values) TruncateBefore(t time.Time) Values {
|
||||
func (v Values) TruncateBefore(t clientmodel.Timestamp) Values {
|
||||
index := sort.Search(len(v), func(i int) bool {
|
||||
timestamp := v[i].Timestamp
|
||||
|
||||
|
@ -151,7 +163,7 @@ func NewValuesFromDTO(d *dto.SampleValueSeries) Values {
|
|||
|
||||
for _, value := range d.Value {
|
||||
v = append(v, &SamplePair{
|
||||
Timestamp: time.Unix(value.GetTimestamp(), 0).UTC(),
|
||||
Timestamp: clientmodel.TimestampFromUnix(value.GetTimestamp()),
|
||||
Value: clientmodel.SampleValue(value.GetValue()),
|
||||
})
|
||||
}
|
||||
|
@ -165,6 +177,6 @@ type SampleSet struct {
|
|||
}
|
||||
|
||||
type Interval struct {
|
||||
OldestInclusive time.Time
|
||||
NewestInclusive time.Time
|
||||
OldestInclusive clientmodel.Timestamp
|
||||
NewestInclusive clientmodel.Timestamp
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ package metric
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
|
||||
|
@ -30,8 +29,8 @@ import (
|
|||
// SampleKey.
|
||||
type SampleKey struct {
|
||||
Fingerprint *clientmodel.Fingerprint
|
||||
FirstTimestamp time.Time
|
||||
LastTimestamp time.Time
|
||||
FirstTimestamp clientmodel.Timestamp
|
||||
LastTimestamp clientmodel.Timestamp
|
||||
SampleCount uint32
|
||||
}
|
||||
|
||||
|
@ -71,7 +70,7 @@ func (s *SampleKey) Equal(o *SampleKey) bool {
|
|||
// MayContain indicates whether the given SampleKey could potentially contain a
|
||||
// value at the provided time. Even if true is emitted, that does not mean a
|
||||
// satisfactory value, in fact, exists.
|
||||
func (s *SampleKey) MayContain(t time.Time) bool {
|
||||
func (s *SampleKey) MayContain(t clientmodel.Timestamp) bool {
|
||||
switch {
|
||||
case t.Before(s.FirstTimestamp):
|
||||
return false
|
||||
|
@ -82,7 +81,7 @@ func (s *SampleKey) MayContain(t time.Time) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t time.Time) bool {
|
||||
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) bool {
|
||||
if s.Fingerprint.Less(fp) {
|
||||
return true
|
||||
}
|
||||
|
@ -118,6 +117,6 @@ func (s *SampleKey) Load(d *dto.SampleKey) {
|
|||
loadFingerprint(f, d.GetFingerprint())
|
||||
s.Fingerprint = f
|
||||
s.FirstTimestamp = indexable.DecodeTime(d.Timestamp)
|
||||
s.LastTimestamp = time.Unix(d.GetLastTimestamp(), 0).UTC()
|
||||
s.LastTimestamp = clientmodel.TimestampFromUnix(d.GetLastTimestamp())
|
||||
s.SampleCount = d.GetSampleCount()
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"sort"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -95,7 +94,7 @@ func ReadEmptyTests(p MetricPersistence, t test.Tester) {
|
|||
func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
|
||||
appendSample := func(x int) (success bool) {
|
||||
v := clientmodel.SampleValue(x)
|
||||
ts := time.Unix(int64(x), int64(x))
|
||||
ts := clientmodel.TimestampFromUnix(int64(x))
|
||||
labelName := clientmodel.LabelName(x)
|
||||
labelValue := clientmodel.LabelValue(x)
|
||||
l := clientmodel.Metric{labelName: labelValue}
|
||||
|
@ -124,7 +123,7 @@ func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
|
|||
func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester) {
|
||||
appendSample := func(x int) (success bool) {
|
||||
v := clientmodel.SampleValue(x)
|
||||
ts := time.Unix(int64(x), int64(x))
|
||||
ts := clientmodel.TimestampFromUnix(int64(x))
|
||||
labelName := clientmodel.LabelName(x)
|
||||
labelValue := clientmodel.LabelValue(x)
|
||||
l := clientmodel.Metric{labelName: labelValue}
|
||||
|
@ -175,7 +174,7 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
|
|||
appendSample := func(x int) bool {
|
||||
sample := &clientmodel.Sample{
|
||||
Value: clientmodel.SampleValue(x),
|
||||
Timestamp: time.Unix(int64(x), 0),
|
||||
Timestamp: clientmodel.TimestampFromUnix(int64(x)),
|
||||
Metric: clientmodel.Metric{clientmodel.MetricNameLabel: "my_metric"},
|
||||
}
|
||||
|
||||
|
@ -227,7 +226,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr
|
|||
return
|
||||
}
|
||||
|
||||
type timeslice []time.Time
|
||||
type timeslice []clientmodel.Timestamp
|
||||
|
||||
func (t timeslice) Len() int {
|
||||
return len(t)
|
||||
|
@ -313,7 +312,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
|
|||
// BUG(matt): Invariant of the in-memory database assumes this.
|
||||
sortedTimestamps := timeslice{}
|
||||
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
|
||||
sortedTimestamps = append(sortedTimestamps, time.Unix(nextTimestamp(), 0))
|
||||
sortedTimestamps = append(sortedTimestamps, clientmodel.TimestampFromUnix(nextTimestamp()))
|
||||
}
|
||||
sort.Sort(sortedTimestamps)
|
||||
|
||||
|
@ -465,8 +464,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
|
|||
}
|
||||
|
||||
interval := Interval{
|
||||
OldestInclusive: time.Unix(begin, 0),
|
||||
NewestInclusive: time.Unix(end, 0),
|
||||
OldestInclusive: clientmodel.TimestampFromUnix(begin),
|
||||
NewestInclusive: clientmodel.TimestampFromUnix(end),
|
||||
}
|
||||
|
||||
samples := Values{}
|
||||
|
|
|
@ -35,7 +35,7 @@ type chunk Values
|
|||
// dropped. The original slice is not mutated. It works with the assumption
|
||||
// that consumers of these values could want preceding values if none would
|
||||
// exist prior to the defined time.
|
||||
func (c chunk) TruncateBefore(t time.Time) chunk {
|
||||
func (c chunk) TruncateBefore(t clientmodel.Timestamp) chunk {
|
||||
index := sort.Search(len(c), func(i int) bool {
|
||||
timestamp := c[i].Timestamp
|
||||
|
||||
|
@ -112,7 +112,7 @@ const (
|
|||
|
||||
const watermarkCacheLimit = 1024 * 1024
|
||||
|
||||
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) {
|
||||
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) {
|
||||
if isDir, _ := utility.IsDir(rootDirectory); !isDir {
|
||||
return nil, fmt.Errorf("Could not find metrics directory %s", rootDirectory)
|
||||
}
|
||||
|
@ -286,7 +286,7 @@ func (t *TieredStorage) Flush() {
|
|||
}
|
||||
|
||||
func (t *TieredStorage) flushMemory(ttl time.Duration) {
|
||||
flushOlderThan := time.Now().Add(-1 * ttl)
|
||||
flushOlderThan := clientmodel.Now().Add(-1 * ttl)
|
||||
|
||||
glog.Info("Flushing samples to disk...")
|
||||
t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue)
|
||||
|
@ -336,7 +336,7 @@ func (t *TieredStorage) close() {
|
|||
t.state = tieredStorageStopping
|
||||
}
|
||||
|
||||
func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (bool, error) {
|
||||
func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i clientmodel.Timestamp) (bool, error) {
|
||||
// BUG(julius): Make this configurable by query layer.
|
||||
i = i.Add(-stalenessLimit)
|
||||
|
||||
|
@ -401,7 +401,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
|
||||
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
|
||||
for _, scanJob := range scans {
|
||||
old, err := t.seriesTooOld(scanJob.fingerprint, *scanJob.operations[0].CurrentTime())
|
||||
old, err := t.seriesTooOld(scanJob.fingerprint, scanJob.operations[0].CurrentTime())
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err)
|
||||
continue
|
||||
|
@ -420,7 +420,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
}
|
||||
|
||||
// Load data value chunk(s) around the first standing op's current time.
|
||||
targetTime := *standingOps[0].CurrentTime()
|
||||
targetTime := standingOps[0].CurrentTime()
|
||||
|
||||
currentChunk := chunk{}
|
||||
// If we aimed before the oldest value in memory, load more data from disk.
|
||||
|
@ -494,7 +494,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
break
|
||||
}
|
||||
|
||||
currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime()))
|
||||
currentChunk = currentChunk.TruncateBefore(op.CurrentTime())
|
||||
|
||||
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
|
||||
out = op.ExtractSamples(Values(currentChunk))
|
||||
|
@ -537,7 +537,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
return
|
||||
}
|
||||
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts time.Time, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts clientmodel.Timestamp, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
|
||||
if fingerprint.Less(firstBlock.Fingerprint) {
|
||||
return nil, false
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import (
|
|||
"github.com/prometheus/prometheus/utility/test"
|
||||
)
|
||||
|
||||
func buildSamples(from, to time.Time, interval time.Duration, m clientmodel.Metric) (v clientmodel.Samples) {
|
||||
func buildSamples(from, to clientmodel.Timestamp, interval time.Duration, m clientmodel.Metric) (v clientmodel.Samples) {
|
||||
i := clientmodel.SampleValue(0)
|
||||
|
||||
for from.Before(to) {
|
||||
|
@ -57,7 +57,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
|
|||
fingerprint := &clientmodel.Fingerprint{}
|
||||
fingerprint.LoadFromMetric(metric)
|
||||
var (
|
||||
instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local)
|
||||
instant = clientmodel.TimestampFromTime(time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local))
|
||||
scenarios = []struct {
|
||||
data clientmodel.Samples
|
||||
in in
|
||||
|
@ -315,11 +315,11 @@ func testMakeView(t test.Tester, flushToDisk bool) {
|
|||
},
|
||||
// Two chunks of samples, query asks for values from first chunk.
|
||||
{
|
||||
data: buildSamples(instant, instant.Add(time.Duration(*leveldbChunkSize*2)*time.Second), time.Second, metric),
|
||||
data: buildSamples(instant, instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second), 2*time.Second, metric),
|
||||
in: in{
|
||||
atTime: []getValuesAtTimeOp{
|
||||
{
|
||||
time: instant.Add(time.Second*time.Duration(*leveldbChunkSize/2) + 1),
|
||||
time: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) + clientmodel.MinimumTick),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -327,12 +327,12 @@ func testMakeView(t test.Tester, flushToDisk bool) {
|
|||
atTime: []Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize/2)),
|
||||
Value: 100,
|
||||
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2)),
|
||||
Value: 200,
|
||||
},
|
||||
{
|
||||
Timestamp: instant.Add(time.Second * (time.Duration(*leveldbChunkSize/2) + 1)),
|
||||
Value: 101,
|
||||
Timestamp: instant.Add(time.Second * (time.Duration(*leveldbChunkSize*2) + 2)),
|
||||
Value: 201,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -574,9 +574,9 @@ func TestGetFingerprintsForLabelSet(t *testing.T) {
|
|||
func testTruncateBefore(t test.Tester) {
|
||||
type in struct {
|
||||
values Values
|
||||
time time.Time
|
||||
time clientmodel.Timestamp
|
||||
}
|
||||
instant := time.Now()
|
||||
instant := clientmodel.Now()
|
||||
var scenarios = []struct {
|
||||
in in
|
||||
out Values
|
||||
|
|
|
@ -30,9 +30,9 @@ var (
|
|||
// Represents the summation of all datastore queries that shall be performed to
|
||||
// extract values. Each operation mutates the state of the builder.
|
||||
type ViewRequestBuilder interface {
|
||||
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time)
|
||||
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration)
|
||||
GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time)
|
||||
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
|
||||
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
|
||||
GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp)
|
||||
ScanJobs() scanJobs
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ var getValuesAtTimes = newValueAtTimeList(10 * 1024)
|
|||
|
||||
// Gets for the given Fingerprint either the value at that time if there is an
|
||||
// match or the one or two values adjacent thereto.
|
||||
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
|
||||
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) {
|
||||
ops := v.operations[*fingerprint]
|
||||
op, _ := getValuesAtTimes.Get()
|
||||
op.time = time
|
||||
|
@ -65,7 +65,7 @@ var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
|
|||
// Gets for the given Fingerprint either the value at that interval from From
|
||||
// through Through if there is an match or the one or two values adjacent
|
||||
// for each point.
|
||||
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
|
||||
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) {
|
||||
ops := v.operations[*fingerprint]
|
||||
op, _ := getValuesAtIntervals.Get()
|
||||
op.from = from
|
||||
|
@ -79,7 +79,7 @@ var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
|
|||
|
||||
// Gets for the given Fingerprint the values that occur inclusively from From
|
||||
// through Through.
|
||||
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
|
||||
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) {
|
||||
ops := v.operations[*fingerprint]
|
||||
op, _ := getValuesAlongRanges.Get()
|
||||
op.from = from
|
||||
|
@ -96,7 +96,7 @@ var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
|
|||
// ^ ^ ^ ^ ^ ^
|
||||
// | \------------/ \----/ |
|
||||
// from interval rangeDuration through
|
||||
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) {
|
||||
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) {
|
||||
ops := v.operations[*fingerprint]
|
||||
op, _ := getValuesAtIntervalAlongRanges.Get()
|
||||
op.rangeFrom = from
|
||||
|
|
|
@ -25,20 +25,20 @@ import (
|
|||
func testBuilder(t test.Tester) {
|
||||
type atTime struct {
|
||||
fingerprint string
|
||||
time time.Time
|
||||
time clientmodel.Timestamp
|
||||
}
|
||||
|
||||
type atInterval struct {
|
||||
fingerprint string
|
||||
from time.Time
|
||||
through time.Time
|
||||
from clientmodel.Timestamp
|
||||
through clientmodel.Timestamp
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
type atRange struct {
|
||||
fingerprint string
|
||||
from time.Time
|
||||
through time.Time
|
||||
from clientmodel.Timestamp
|
||||
through clientmodel.Timestamp
|
||||
}
|
||||
|
||||
type in struct {
|
||||
|
@ -62,11 +62,11 @@ func testBuilder(t test.Tester) {
|
|||
atTimes: []atTime{
|
||||
{
|
||||
fingerprint: "0000000000000001111-a-4-a",
|
||||
time: time.Unix(100, 0),
|
||||
time: clientmodel.TimestampFromUnix(100),
|
||||
},
|
||||
{
|
||||
fingerprint: "0000000000000000000-a-4-a",
|
||||
time: time.Unix(100, 0),
|
||||
time: clientmodel.TimestampFromUnix(100),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -85,19 +85,19 @@ func testBuilder(t test.Tester) {
|
|||
atTimes: []atTime{
|
||||
{
|
||||
fingerprint: "1111-a-4-a",
|
||||
time: time.Unix(100, 0),
|
||||
time: clientmodel.TimestampFromUnix(100),
|
||||
},
|
||||
{
|
||||
fingerprint: "1111-a-4-a",
|
||||
time: time.Unix(200, 0),
|
||||
time: clientmodel.TimestampFromUnix(200),
|
||||
},
|
||||
{
|
||||
fingerprint: "0-a-4-a",
|
||||
time: time.Unix(100, 0),
|
||||
time: clientmodel.TimestampFromUnix(100),
|
||||
},
|
||||
{
|
||||
fingerprint: "0-a-4-a",
|
||||
time: time.Unix(0, 0),
|
||||
time: clientmodel.TimestampFromUnix(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -116,19 +116,19 @@ func testBuilder(t test.Tester) {
|
|||
atTimes: []atTime{
|
||||
{
|
||||
fingerprint: "1111-a-4-a",
|
||||
time: time.Unix(100, 0),
|
||||
time: clientmodel.TimestampFromUnix(100),
|
||||
},
|
||||
},
|
||||
atRanges: []atRange{
|
||||
{
|
||||
fingerprint: "1111-a-4-a",
|
||||
from: time.Unix(100, 0),
|
||||
through: time.Unix(1000, 0),
|
||||
from: clientmodel.TimestampFromUnix(100),
|
||||
through: clientmodel.TimestampFromUnix(1000),
|
||||
},
|
||||
{
|
||||
fingerprint: "1111-a-4-a",
|
||||
from: time.Unix(100, 0),
|
||||
through: time.Unix(9000, 0),
|
||||
from: clientmodel.TimestampFromUnix(100),
|
||||
through: clientmodel.TimestampFromUnix(9000),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
package metric
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
@ -29,11 +27,11 @@ import (
|
|||
)
|
||||
|
||||
type watermarks struct {
|
||||
High time.Time
|
||||
High clientmodel.Timestamp
|
||||
}
|
||||
|
||||
func (w *watermarks) load(d *dto.MetricHighWatermark) {
|
||||
w.High = time.Unix(d.GetTimestamp(), 0).UTC()
|
||||
w.High = clientmodel.TimestampFromUnix(d.GetTimestamp())
|
||||
}
|
||||
|
||||
func (w *watermarks) dump(d *dto.MetricHighWatermark) {
|
||||
|
@ -42,14 +40,14 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) {
|
|||
d.Timestamp = proto.Int64(w.High.Unix())
|
||||
}
|
||||
|
||||
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
|
||||
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]clientmodel.Timestamp
|
||||
|
||||
type HighWatermarker interface {
|
||||
raw.ForEacher
|
||||
raw.Pruner
|
||||
|
||||
UpdateBatch(FingerprintHighWatermarkMapping) error
|
||||
Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error)
|
||||
Get(*clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error)
|
||||
State() *raw.DatabaseState
|
||||
Size() (uint64, bool, error)
|
||||
}
|
||||
|
@ -58,7 +56,7 @@ type LevelDBHighWatermarker struct {
|
|||
p *leveldb.LevelDBPersistence
|
||||
}
|
||||
|
||||
func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
|
||||
func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) {
|
||||
k := new(dto.Fingerprint)
|
||||
dumpFingerprint(k, f)
|
||||
v := new(dto.MetricHighWatermark)
|
||||
|
@ -67,9 +65,9 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
|
|||
return t, ok, err
|
||||
}
|
||||
if !ok {
|
||||
return time.Unix(0, 0), ok, nil
|
||||
return clientmodel.TimestampFromUnix(0), ok, nil
|
||||
}
|
||||
t = time.Unix(v.GetTimestamp(), 0)
|
||||
t = clientmodel.TimestampFromUnix(v.GetTimestamp())
|
||||
return t, true, nil
|
||||
}
|
||||
|
||||
|
@ -143,8 +141,8 @@ func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWat
|
|||
type CurationRemarker interface {
|
||||
raw.Pruner
|
||||
|
||||
Update(*curationKey, time.Time) error
|
||||
Get(*curationKey) (t time.Time, ok bool, err error)
|
||||
Update(*curationKey, clientmodel.Timestamp) error
|
||||
Get(*curationKey) (t clientmodel.Timestamp, ok bool, err error)
|
||||
State() *raw.DatabaseState
|
||||
Size() (uint64, bool, error)
|
||||
}
|
||||
|
@ -176,20 +174,20 @@ func (w *LevelDBCurationRemarker) Prune() (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (w *LevelDBCurationRemarker) Get(c *curationKey) (t time.Time, ok bool, err error) {
|
||||
func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, ok bool, err error) {
|
||||
k := new(dto.CurationKey)
|
||||
c.dump(k)
|
||||
v := new(dto.CurationValue)
|
||||
|
||||
ok, err = w.p.Get(k, v)
|
||||
if err != nil || !ok {
|
||||
return time.Unix(0, 0), ok, err
|
||||
return clientmodel.TimestampFromUnix(0), ok, err
|
||||
}
|
||||
|
||||
return time.Unix(v.GetLastCompletionTimestamp(), 0).UTC(), true, nil
|
||||
return clientmodel.TimestampFromUnix(v.GetLastCompletionTimestamp()), true, nil
|
||||
}
|
||||
|
||||
func (w *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error {
|
||||
func (w *LevelDBCurationRemarker) Update(pair *curationKey, t clientmodel.Timestamp) error {
|
||||
k := new(dto.CurationKey)
|
||||
pair.dump(k)
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
storageRoot = flag.String("storage.root", "", "The path to the storage root for Prometheus.")
|
||||
storageRoot = flag.String("storage.root", "", "The path to the storage root for Prometheus.")
|
||||
dieOnBadChunk = flag.Bool("dieOnBadChunk", false, "Whether to die upon encountering a bad chunk.")
|
||||
)
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ func (serv MetricsService) Query(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
timestamp := serv.time.Now()
|
||||
timestamp := clientmodel.TimestampFromTime(serv.time.Now())
|
||||
|
||||
queryStats := stats.NewTimerGroup()
|
||||
result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats)
|
||||
|
@ -92,7 +92,7 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if end == 0 {
|
||||
end = serv.time.Now().Unix()
|
||||
end = clientmodel.Now().Unix()
|
||||
}
|
||||
|
||||
if step < 1 {
|
||||
|
@ -111,8 +111,8 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {
|
|||
evalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
|
||||
matrix, err := ast.EvalVectorRange(
|
||||
exprNode.(ast.VectorNode),
|
||||
time.Unix(end-duration, 0).UTC(),
|
||||
time.Unix(end, 0).UTC(),
|
||||
clientmodel.TimestampFromUnix(end-duration),
|
||||
clientmodel.TimestampFromUnix(end),
|
||||
time.Duration(step)*time.Second,
|
||||
serv.Storage,
|
||||
queryStats)
|
||||
|
|
Loading…
Reference in a new issue