diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index fc5cbd19c..ebcdb45bb 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,5 @@ # Prometheus Team +- Johannes Ziemke - Julius Volz - Matt T. Proud +- Tobias Schmidt diff --git a/Makefile b/Makefile index 9b9582be7..376d52e87 100644 --- a/Makefile +++ b/Makefile @@ -16,12 +16,14 @@ TEST_ARTIFACTS = prometheus prometheus.build search_index all: test test: build - go test ./... + go test ./... $(GO_TEST_FLAGS) build: $(MAKE) -C model $(MAKE) -C web go build ./... + +binary: build go build -o prometheus.build clean: @@ -44,4 +46,4 @@ search_index: documentation: search_index godoc -http=:6060 -index -index_files='search_index' -.PHONY: advice build clean documentation format search_index test +.PHONY: advice binary build clean documentation format search_index test diff --git a/Makefile.TRAVIS b/Makefile.TRAVIS index 74402dcdd..c80299518 100644 --- a/Makefile.TRAVIS +++ b/Makefile.TRAVIS @@ -28,6 +28,8 @@ export LDFLAGS := $(LDFLAGS) -L$(OVERLAY_ROOT)/lib export CGO_CFLAGS := $(CFLAGS) -lsnappy export CGO_LDFLAGS := $(LDFLAGS) +export GO_TEST_FLAGS := "-v" + GO_GET := go get -u -v -x APT_GET_INSTALL := sudo apt-get install -y WGET := wget -c diff --git a/README.md b/README.md index 8f1ace54a..fe4501418 100644 --- a/README.md +++ b/README.md @@ -24,12 +24,18 @@ action if some condition is observed to be true. 7. Prometheus Client, Prometheus in Prometheus (https://github.com/prometheus/client_golang). 8. Snappy, a compression library for LevelDB and Levigo (http://code.google.com/p/snappy/). -## Getting started +## Getting Started For basic help how to get started: - * For Linux users, please consult the Travis CI configuration in _.travis.yml_. + * The source code is periodically indexed: [Prometheus Core](http://godoc.org/github.com/prometheus/prometheus). + * For Linux users, please consult the Travis CI configuration in _.travis.yml_ and _Makefile.TRAVIS_. * [Getting started on Mac OSX](documentation/guides/getting-started-osx.md) + * All of the core developers are accessible via the [Prometheus Developers Mailinglist](https://groups.google.com/forum/?fromgroups#!forum/prometheus-developers). + +## Testing + +[![Build Status](https://travis-ci.org/prometheus/prometheus.png)](https://travis-ci.org/prometheus/prometheus) ## License diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 273395207..19c8bef15 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -15,13 +15,14 @@ package coding import ( "code.google.com/p/goprotobuf/proto" + "fmt" ) -type ProtocolBufferEncoder struct { +type ProtocolBuffer struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { +func (p ProtocolBuffer) Encode() (raw []byte, err error) { raw, err = proto.Marshal(p.message) // XXX: Adjust legacy users of this to not check for error. @@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { return } -func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { - return &ProtocolBufferEncoder{ +func (p ProtocolBuffer) String() string { + return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +} + +func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { + return &ProtocolBuffer{ message: message, } } diff --git a/main.go b/main.go index 6ce8aadfa..2f3cd78fa 100644 --- a/main.go +++ b/main.go @@ -95,7 +95,7 @@ func main() { case ruleResult := <-ruleResults: for _, sample := range ruleResult.Samples { - ts.AppendSample(*sample) + ts.AppendSample(sample) } } } diff --git a/model/fingerprinting.go b/model/fingerprinting.go index 6dd1257f9..efdd64fa8 100644 --- a/model/fingerprinting.go +++ b/model/fingerprinting.go @@ -120,6 +120,10 @@ type fingerprint struct { lastCharacterOfLastLabelValue string } +func (f fingerprint) String() string { + return f.ToRowKey() +} + func (f fingerprint) ToRowKey() string { return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter) } diff --git a/model/watermark.go b/model/watermark.go index 93cf47c91..e6126bd41 100644 --- a/model/watermark.go +++ b/model/watermark.go @@ -14,6 +14,7 @@ package model import ( + "code.google.com/p/goprotobuf/proto" dto "github.com/prometheus/prometheus/model/generated" "time" ) @@ -24,6 +25,14 @@ type Watermark struct { time.Time } +// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given +// Watermark. +func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark { + return &dto.MetricHighWatermark{ + Timestamp: proto.Int64(w.Time.Unix()), + } +} + // NewWatermarkFromHighWatermarkDTO builds Watermark from the provided // dto.MetricHighWatermark object. func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { @@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { time.Unix(*d.Timestamp, 0), } } + +// NewWatermarkFromTime builds a new Watermark for the provided time. +func NewWatermarkFromTime(t time.Time) Watermark { + return Watermark{ + t, + } +} diff --git a/rules/ast/ast.go b/rules/ast/ast.go index a0e7c66e3..bfc4d39f2 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -27,8 +27,8 @@ import ( // ---------------------------------------------------------------------------- // Raw data value types. -type Vector []*model.Sample -type Matrix []*model.SampleSet +type Vector model.Samples +type Matrix []model.SampleSet type groupedAggregation struct { labels model.Metric @@ -97,23 +97,23 @@ type Node interface { // interface represents the type returned to the parent node. type ScalarNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue + Eval(timestamp time.Time, view *viewAdapter) model.SampleValue } type VectorNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) Vector + Eval(timestamp time.Time, view *viewAdapter) Vector } type MatrixNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) Matrix - EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix + Eval(timestamp time.Time, view *viewAdapter) Matrix + EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix } type StringNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) string + Eval(timestamp time.Time, view *viewAdapter) string } // ---------------------------------------------------------------------------- @@ -146,6 +146,8 @@ type ( // Vector literal, i.e. metric name plus labelset. VectorLiteral struct { labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints } // A function of vector return type. @@ -175,8 +177,10 @@ type ( type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { - labels model.LabelSet - interval time.Duration + labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints + interval time.Duration } ) @@ -223,17 +227,17 @@ func (node MatrixLiteral) Children() []Node { return []Node{} } func (node StringLiteral) Children() []Node { return []Node{} } func (node StringFunctionCall) Children() []Node { return node.args } -func (node *ScalarLiteral) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarLiteral) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { return node.value } -func (node *ScalarArithExpr) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarArithExpr) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { lhs := node.lhs.Eval(timestamp, view) rhs := node.rhs.Eval(timestamp, view) return evalScalarBinop(node.opType, lhs, rhs) } -func (node *ScalarFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarFunctionCall) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { return node.function.callFn(timestamp, view, node.args).(model.SampleValue) } @@ -259,7 +263,7 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector) { if err != nil { return } - return node.Eval(×tamp, viewAdapter) + return node.Eval(timestamp, viewAdapter) } func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) { @@ -274,7 +278,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t // TODO implement watchdog timer for long-running queries. sampleSets := map[string]*model.SampleSet{} for t := start; t.Before(end); t = t.Add(interval) { - vector := node.Eval(&t, viewAdapter) + vector := node.Eval(t, viewAdapter) for _, sample := range vector { samplePair := model.SamplePair{ Value: sample.Value, @@ -293,7 +297,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t } for _, sampleSet := range sampleSets { - matrix = append(matrix, sampleSet) + matrix = append(matrix, *sampleSet) } return } @@ -308,23 +312,23 @@ func labelIntersection(metric1, metric2 model.Metric) model.Metric { return intersection } -func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp *time.Time) Vector { +func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp time.Time) Vector { vector := Vector{} for _, aggregation := range aggregations { if node.aggrType == AVG { aggregation.value = aggregation.value / model.SampleValue(aggregation.groupCount) } - sample := &model.Sample{ + sample := model.Sample{ Metric: aggregation.labels, Value: aggregation.value, - Timestamp: *timestamp, + Timestamp: timestamp, } vector = append(vector, sample) } return vector } -func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vector { vector := node.vector.Eval(timestamp, view) result := map[string]*groupedAggregation{} for _, sample := range vector { @@ -357,8 +361,8 @@ func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vec return node.groupedAggregationsToVector(result, timestamp) } -func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector { - values, err := view.GetValueAtTime(node.labels, timestamp) +func (node *VectorLiteral) Eval(timestamp time.Time, view *viewAdapter) Vector { + values, err := view.GetValueAtTime(node.fingerprints, timestamp) if err != nil { log.Printf("Unable to get vector values") return Vector{} @@ -366,7 +370,7 @@ func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector return values } -func (node *VectorFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorFunctionCall) Eval(timestamp time.Time, view *viewAdapter) Vector { return node.function.callFn(timestamp, view, node.args).(Vector) } @@ -510,7 +514,7 @@ func labelsEqual(labels1, labels2 model.Metric) bool { return true } -func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorArithExpr) Eval(timestamp time.Time, view *viewAdapter) Vector { lhs := node.lhs.Eval(timestamp, view) result := Vector{} if node.rhs.Type() == SCALAR { @@ -541,12 +545,12 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vecto panic("Invalid vector arithmetic expression operands") } -func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix { +func (node *MatrixLiteral) Eval(timestamp time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), - NewestInclusive: *timestamp, + NewestInclusive: timestamp, } - values, err := view.GetRangeValues(node.labels, interval) + values, err := view.GetRangeValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get values for vector interval") return Matrix{} @@ -554,12 +558,12 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix return values } -func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix { +func (node *MatrixLiteral) EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), - NewestInclusive: *timestamp, + NewestInclusive: timestamp, } - values, err := view.GetBoundaryValues(node.labels, interval) + values, err := view.GetBoundaryValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get boundary values for vector interval") return Matrix{} @@ -579,11 +583,11 @@ func (matrix Matrix) Swap(i, j int) { matrix[i], matrix[j] = matrix[j], matrix[i] } -func (node *StringLiteral) Eval(timestamp *time.Time, view *viewAdapter) string { +func (node *StringLiteral) Eval(timestamp time.Time, view *viewAdapter) string { return node.str } -func (node *StringFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) string { +func (node *StringFunctionCall) Eval(timestamp time.Time, view *viewAdapter) string { return node.function.callFn(timestamp, view, node.args).(string) } diff --git a/rules/ast/functions.go b/rules/ast/functions.go index 55356c977..4cf5047c9 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -24,7 +24,7 @@ type Function struct { name string argTypes []ExprType returnType ExprType - callFn func(timestamp *time.Time, view *viewAdapter, args []Node) interface{} + callFn func(timestamp time.Time, view *viewAdapter, args []Node) interface{} } func (function *Function) CheckArgTypes(args []Node) error { @@ -63,17 +63,17 @@ func (function *Function) CheckArgTypes(args []Node) error { } // === time() === -func timeImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func timeImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return model.SampleValue(time.Now().Unix()) } // === count(vector VectorNode) === -func countImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func countImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view))) } // === delta(matrix MatrixNode, isCounter ScalarNode) === -func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { matrixNode := args[0].(MatrixNode) isCounter := int(args[1].(ScalarNode).Eval(timestamp, view)) resultVector := Vector{} @@ -98,10 +98,10 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} lastValue = currentValue } resultValue := lastValue - samples.Values[0].Value + counterCorrection - resultSample := &model.Sample{ + resultSample := model.Sample{ Metric: samples.Metric, Value: resultValue, - Timestamp: *timestamp, + Timestamp: timestamp, } resultVector = append(resultVector, resultSample) } @@ -109,7 +109,7 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} } // === rate(node *MatrixNode) === -func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func rateImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { args = append(args, &ScalarLiteral{value: 1}) vector := deltaImpl(timestamp, view, args).(Vector) @@ -124,36 +124,36 @@ func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} } // === sampleVectorImpl() === -func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return Vector{ - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "0", }, Value: 10, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "1", }, Value: 20, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "2", }, Value: 30, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -161,9 +161,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "canary", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -171,9 +171,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "canary", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -181,9 +181,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "mytest", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -191,7 +191,7 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "mytest", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, } } diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 9fa82c675..682f643e6 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -27,66 +27,95 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal var queryStorage metric.Storage = nil type viewAdapter struct { - view metric.View - // TODO: use this. + view metric.View stalenessPolicy *metric.StalenessPolicy } -func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) { - var minDelta time.Duration - for _, candidate := range samples { - // Ignore samples outside of staleness policy window. - delta := candidate.Timestamp.Sub(*timestamp) - if delta < 0 { - delta = -delta - } - if delta > v.stalenessPolicy.DeltaAllowance { - continue - } +// interpolateSamples interpolates a value at a target time between two +// provided sample pairs. +func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *model.SamplePair { + dv := second.Value - first.Value + dt := second.Timestamp.Sub(first.Timestamp) - // Skip sample if we've seen a closer one before this. - if sample != nil { - if delta > minDelta { + dDt := dv / model.SampleValue(dt) + offset := model.SampleValue(timestamp.Sub(first.Timestamp)) + + return &model.SamplePair{ + Value: first.Value + (offset * dDt), + Timestamp: timestamp, + } +} + +// chooseClosestSample chooses the closest sample of a list of samples +// surrounding a given target time. If samples are found both before and after +// the target time, the sample value is interpolated between these. Otherwise, +// the single closest sample is returned verbatim. +func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp time.Time) (sample *model.SamplePair) { + var closestBefore *model.SamplePair + var closestAfter *model.SamplePair + for _, candidate := range samples { + delta := candidate.Timestamp.Sub(timestamp) + // Samples before target time. + if delta < 0 { + // Ignore samples outside of staleness policy window. + if -delta > v.stalenessPolicy.DeltaAllowance { continue } + // Ignore samples that are farther away than what we've seen before. + if closestBefore != nil && candidate.Timestamp.Before(closestBefore.Timestamp) { + continue + } + sample := candidate + closestBefore = &sample } - sample = &candidate - minDelta = delta + // Samples after target time. + if delta >= 0 { + // Ignore samples outside of staleness policy window. + if delta > v.stalenessPolicy.DeltaAllowance { + continue + } + // Ignore samples that are farther away than samples we've seen before. + if closestAfter != nil && candidate.Timestamp.After(closestAfter.Timestamp) { + continue + } + sample := candidate + closestAfter = &sample + } } + + switch { + case closestBefore != nil && closestAfter != nil: + sample = interpolateSamples(closestBefore, closestAfter, timestamp) + case closestBefore != nil: + sample = closestBefore + default: + sample = closestAfter + } + return } -func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) { for _, fingerprint := range fingerprints { - sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp) + sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp) m, err := queryStorage.GetMetricForFingerprint(fingerprint) if err != nil { continue } if samplePair != nil { - samples = append(samples, &model.Sample{ + samples = append(samples, model.Sample{ Metric: *m, Value: samplePair.Value, - Timestamp: *timestamp, + Timestamp: timestamp, }) } } return } -func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { for _, fingerprint := range fingerprints { // TODO: change to GetBoundaryValues() once it has the right return type. samplePairs := v.view.GetRangeValues(fingerprint, *interval) @@ -100,7 +129,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I continue } - sampleSet := &model.SampleSet{ + sampleSet := model.SampleSet{ Metric: *m, Values: samplePairs, } @@ -109,12 +138,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I return sampleSets, nil } -func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { for _, fingerprint := range fingerprints { samplePairs := v.view.GetRangeValues(fingerprint, *interval) if samplePairs == nil { @@ -127,7 +151,7 @@ func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Inte continue } - sampleSet := &model.SampleSet{ + sampleSet := model.SampleSet{ Metric: *m, Values: samplePairs, } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index 17062bda2..2864361dc 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -152,8 +152,8 @@ func TypedValueToJSON(data interface{}, typeStr string) string { return string(dataJSON) } -func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { - viewAdapter, err := viewAdapterForInstantQuery(node, *timestamp) +func EvalToString(node Node, timestamp time.Time, format OutputFormat) string { + viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) if err != nil { panic(err) } diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 17578509d..2c6ec92f8 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -66,6 +66,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { if !analyzer.IntervalRanges[fingerprint] { analyzer.IntervalRanges[fingerprint] = true @@ -77,6 +78,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { interval := n.interval // If an interval has already been recorded for this fingerprint, merge diff --git a/rules/lexer.l b/rules/lexer.l index 643f52fc6..1edfcc6b0 100644 --- a/rules/lexer.l +++ b/rules/lexer.l @@ -45,7 +45,7 @@ AVG|SUM|MAX|MIN { yylval.str = yytext; return AGGR_OP } [*/%] { yylval.str = yytext; return MULT_OP } {D}+{U} { yylval.str = yytext; return DURATION } -{L}({L}|{D})+ { yylval.str = yytext; return IDENTIFIER } +{L}({L}|{D})* { yylval.str = yytext; return IDENTIFIER } \-?{D}+(\.{D}*)? { num, err := strconv.ParseFloat(yytext, 32); if (err != nil && err.(*strconv.NumError).Err == strconv.ErrSyntax) { diff --git a/rules/lexer.l.go b/rules/lexer.l.go index 7ec712fd6..0671c45af 100644 --- a/rules/lexer.l.go +++ b/rules/lexer.l.go @@ -411,7 +411,7 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon return yyactionreturn{DURATION, yyRT_USER_RETURN} } return yyactionreturn{0, yyRT_FALLTHROUGH} -}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))+"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { +}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))*"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { defer func() { if r := recover(); r != nil { if r != "yyREJECT" { diff --git a/rules/rules_test.go b/rules/rules_test.go index 43e8ae554..1b928ffd9 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -179,6 +179,13 @@ var expressionTests = []struct { }, fullRanges: 8, intervalRanges: 0, + }, { + expr: "x{y='testvalue'}", + output: []string{ + "x{y='testvalue'} => 100 @[%v]", + }, + fullRanges: 0, + intervalRanges: 1, // Invalid expressions that should fail to parse. }, { expr: "", @@ -241,7 +248,7 @@ func TestExpressions(t *testing.T) { t.Errorf("Test should fail, but didn't") } failed := false - resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT) + resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT) resultLines := strings.Split(resultStr, "\n") if len(exprTest.output) != len(resultLines) { diff --git a/rules/testdata.go b/rules/testdata.go index 7e853f594..7a900bab0 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -42,7 +42,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { vector := ast.Vector{} for _, sampleSet := range matrix { lastSample := sampleSet.Values[len(sampleSet.Values)-1] - vector = append(vector, &model.Sample{ + vector = append(vector, model.Sample{ Metric: sampleSet.Metric, Value: lastSample.Value, Timestamp: lastSample.Timestamp, @@ -140,6 +140,14 @@ var testMatrix = ast.Matrix{ }, Values: getTestValueStream(0, 800, 80), }, + // Single-letter metric and label names. + { + Metric: model.Metric{ + model.MetricNameLabel: "x", + "y": "testvalue", + }, + Values: getTestValueStream(0, 100, 10), + }, } var testVector = getTestVectorFromTestMatrix(testMatrix) diff --git a/storage/metric/.gitignore b/storage/metric/.gitignore new file mode 100644 index 000000000..3460f0346 --- /dev/null +++ b/storage/metric/.gitignore @@ -0,0 +1 @@ +command-line-arguments.test diff --git a/storage/metric/curator.go b/storage/metric/curator.go index fb3d64646..592f4ae87 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -37,8 +37,9 @@ type curator struct { // watermarks is the on-disk store that is scanned for high watermarks for // given metrics. watermarks raw.Persistence - // cutOff represents the most recent time up to which values will be curated. - cutOff time.Time + // recencyThreshold represents the most recent time up to which values will be + // curated. + recencyThreshold time.Duration // groupingQuantity represents the number of samples below which encountered // samples will be dismembered and reaggregated into larger groups. groupingQuantity uint32 @@ -48,9 +49,9 @@ type curator struct { } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { return curator{ - cutOff: cutOff, + recencyThreshold: recencyThreshold, stop: make(chan bool), samples: samples, curationState: curationState, @@ -60,18 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample } // run facilitates the curation lifecycle. -func (c curator) run() (err error) { - var ( - decoder watermarkDecoder - filter = watermarkFilter{ - stop: c.stop, - } - operator = watermarkOperator{ - olderThan: c.cutOff, - groupSize: c.groupingQuantity, - curationState: c.curationState, - } - ) +func (c curator) run(instant time.Time) (err error) { + decoder := watermarkDecoder{} + filter := watermarkFilter{ + stop: c.stop, + curationState: c.curationState, + groupSize: c.groupingQuantity, + recencyThreshold: c.recencyThreshold, + } + operator := watermarkOperator{ + olderThan: instant.Add(-1 * c.recencyThreshold), + groupSize: c.groupingQuantity, + curationState: c.curationState, + } _, err = c.watermarks.ForEach(decoder, filter, operator) @@ -125,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that remark on + // curationState is the table of CurationKey to CurationValues that rema // far along the curation process has gone for a given metric fingerprint. curationState raw.Persistence // stop, when non-empty, instructs the filter to stop operation. stop chan bool + // groupSize refers to the target groupSize from the curator. + groupSize uint32 + // recencyThreshold refers to the target recencyThreshold from the curator. + recencyThreshold time.Duration } func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - curationKey = fingerprint.ToDTO() - rawCurationValue []byte - err error - curationValue = &dto.CurationValue{} - ) + fingerprint := key.(model.Fingerprint) + watermark := value.(model.Watermark) + curationKey := &dto.CurationKey{ + Fingerprint: fingerprint.ToDTO(), + MinimumGroupSize: proto.Uint32(w.groupSize), + OlderThan: proto.Int64(int64(w.recencyThreshold)), + } + curationValue := &dto.CurationValue{} - rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { panic(err) } @@ -228,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er MinimumGroupSize: proto.Uint32(w.groupSize), } - curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) return } @@ -246,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod } ) - rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { return } diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 5adb75c2e..c950ffb42 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -14,200 +14,510 @@ package metric import ( + "code.google.com/p/goprotobuf/proto" "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/raw" - "sort" + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw/leveldb" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" ) type ( - keyPair struct { - fingerprint model.Fingerprint - time time.Time + curationState struct { + fingerprint string + groupSize int + recencyThreshold time.Duration + lastCurated time.Time } - fakeCurationStates map[model.Fingerprint]time.Time - fakeSamples map[keyPair][]float32 - fakeWatermarks map[model.Fingerprint]time.Time + watermarkState struct { + fingerprint string + lastAppended time.Time + } + + sample struct { + time time.Time + value float32 + } + + sampleGroup struct { + fingerprint string + values []sample + } in struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks - cutOff time.Time - grouping uint32 - } - - out struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + recencyThreshold time.Duration + groupSize uint32 } ) -func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (c curationState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBuffer(&dto.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), + MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), + OlderThan: proto.Int64(int64(c.recencyThreshold)), + }) -func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeCurationStates) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Close() error { - panic("unimplemented") -} - -func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - var ( - fingerprints model.Fingerprints - ) - - for f := range c { - fingerprints = append(fingerprints, f) - } - - sort.Sort(fingerprints) - - for _, k := range fingerprints { - v := c[k] - - var ( - decodedKey interface{} - decodedValue interface{} - ) - - decodedKey, err = d.DecodeKey(k) - if err != nil { - continue - } - - decodedValue, err = d.DecodeValue(v) - if err != nil { - continue - } - - switch f.Filter(decodedKey, decodedValue) { - case storage.STOP: - return - case storage.SKIP: - continue - case storage.ACCEPT: - opErr := o.Operate(decodedKey, decodedValue) - if opErr != nil { - if opErr.Continuable { - continue - } - break - } - } - } + value = coding.NewProtocolBuffer(&dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), + }) return } -func (c fakeCurationStates) Commit(_ raw.Batch) error { - panic("unimplemented") +func (w watermarkState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + return } -func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (s sampleGroup) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBuffer(&dto.SampleKey{ + Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), + Timestamp: indexable.EncodeTime(s.values[0].time), + LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), + SampleCount: proto.Uint32(uint32(len(s.values))), + }) -func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} + series := &dto.SampleValueSeries{} -func (c fakeSamples) Drop(_ coding.Encoder) error { - panic("unimplemented") -} + for _, value := range s.values { + series.Value = append(series.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(value.time.Unix()), + Value: proto.Float32(float32(value.value)), + }) + } -func (c fakeSamples) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} + value = coding.NewProtocolBuffer(series) -func (c fakeSamples) Close() error { - panic("unimplemented") -} - -func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeSamples) Commit(_ raw.Batch) (err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Close() error { - panic("unimplemented") -} - -func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Commit(_ raw.Batch) (err error) { - panic("unimplemented") + return } func TestCurator(t *testing.T) { var ( scenarios = []struct { - in in - out out + in in }{ { in: in{ - curationStates: fakeCurationStates{ - model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), - model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), + recencyThreshold: 1 * time.Hour, + groupSize: 5, + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + }, + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 2, + recencyThreshold: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-2-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 55 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 50 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 45 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 40 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 25 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 89 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 88 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 87 * time.Minute), + value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 86 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 84 * time.Minute), + value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 83 * time.Minute), + value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 82 * time.Minute), + value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 81 * time.Minute), + value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 79 * time.Minute), + value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 78 * time.Minute), + value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 77 * time.Minute), + value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 76 * time.Minute), + value: 14, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 15, + }, + { + time: testInstant.Add(-1 * 74 * time.Minute), + value: 16, + }, + { + time: testInstant.Add(-1 * 73 * time.Minute), + value: 17, + }, + { + time: testInstant.Add(-1 * 72 * time.Minute), + value: 18, + }, + { + time: testInstant.Add(-1 * 71 * time.Minute), + value: 19, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 69 * time.Minute), + value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 68 * time.Minute), + value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 67 * time.Minute), + value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 66 * time.Minute), + value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 64 * time.Minute), + value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 63 * time.Minute), + value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 62 * time.Minute), + value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 61 * time.Minute), + value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 30, + }, + }, + }, }, - watermarks: fakeWatermarks{}, - samples: fakeSamples{}, - cutOff: testInstant.Add(5 * time.Minute), - grouping: 5, }, }, } ) for _, scenario := range scenarios { - var ( - in = scenario.in + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) + defer curatorDirectory.Close() - curationStates = in.curationStates - samples = in.samples - watermarks = in.watermarks - cutOff = in.cutOff - grouping = in.grouping - ) + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) + defer watermarkDirectory.Close() - _ = newCurator(cutOff, grouping, curationStates, samples, watermarks) + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) + defer sampleDirectory.Close() + + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer curatorStates.Close() + + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer watermarkStates.Close() + + samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer samples.Close() + + c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) + c.run(testInstant) } } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9c4183902..ecc5c9321 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw, err := coding.NewProtocolBufferEncoder(key).Encode() + raw, err := coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } @@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw, err = coding.NewProtocolBufferEncoder(key).Encode() + raw, err = coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 42979a6a4..cb3381fb0 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -63,6 +63,8 @@ var ( ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, } + curationDuration = metrics.NewCounter() + curationDurations = metrics.NewHistogram(diskLatencyHistogram) storageOperations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter() storageLatency = metrics.NewHistogram(diskLatencyHistogram) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 654ae978b..d1d59f98d 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(fingerprint.ToDTO()) + value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value = &dto.MetricHighWatermark{} raw []byte newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBufferEncoder(key) + keyEncoded = coding.NewProtocolBuffer(key) ) key.Signature = proto.String(fingerprint.ToRowKey()) @@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) mutationCount++ } @@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) if err != nil { return fps, err } @@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { return } @@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T Timestamp: indexable.EncodeTime(t), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } @@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 959f3f784..c4c08a668 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -21,7 +21,9 @@ import ( ) var ( - testInstant = time.Time{} + // ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20`` + usEastern, _ = time.LoadLocation("US/Eastern") + testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern) ) func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c8dc40d1e..81d0fee67 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) @@ -562,7 +562,7 @@ func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fin if err != nil { return } - diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + diskFingerprints, err := t.diskStorage.GetFingerprintsForLabelSet(labelSet) if err != nil { return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ed01cab08..fbd9143b1 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -14,7 +14,6 @@ package metric import ( - "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" "sort" @@ -380,9 +379,7 @@ func testMakeView(t test.Tester) { } } - start := time.Now() tiered.Flush() - fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data)) requestBuilder := NewViewRequestBuilder() @@ -530,3 +527,64 @@ func TestGetAllValuesForLabel(t *testing.T) { } } } + +func TestGetFingerprintsForLabelSet(t *testing.T) { + tiered, closer := newTestTieredStorage(t) + defer closer.Close() + memorySample := model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/foo"}, + } + diskSample := model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/bar"}, + } + if err := tiered.(*tieredStorage).memoryArena.AppendSample(memorySample); err != nil { + t.Fatalf("Failed to add fixture data: %s", err) + } + if err := tiered.(*tieredStorage).diskStorage.AppendSample(diskSample); err != nil { + t.Fatalf("Failed to add fixture data: %s", err) + } + tiered.Flush() + + scenarios := []struct { + labels model.LabelSet + fpCount int + }{ + { + labels: model.LabelSet{}, + fpCount: 0, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + }, + fpCount: 2, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/foo", + }, + fpCount: 1, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/bar", + }, + fpCount: 1, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/baz", + }, + fpCount: 0, + }, + } + + for i, scenario := range scenarios { + fingerprints, err := tiered.GetFingerprintsForLabelSet(scenario.labels) + if err != nil { + t.Fatalf("%d. Error getting metric names: %s", i, err) + } + if len(fingerprints) != scenario.fpCount { + t.Fatalf("%d. Expected metric count %d, got %d", i, scenario.fpCount, len(fingerprints)) + } + } +} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index e00152d3d..a877a6d87 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -21,7 +21,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go new file mode 100644 index 000000000..2cb8d35d9 --- /dev/null +++ b/storage/raw/leveldb/test/fixtures.go @@ -0,0 +1,115 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility/test" +) + +const ( + cacheCapacity = 0 + bitsPerBloomFilterEncoded = 0 +) + +type ( + // Pair models a prospective (key, value) double that will be committed to + // a database. + Pair interface { + Get() (key, value coding.Encoder) + } + + // Pairs models a list of Pair for disk committing. + Pairs []Pair + + // Preparer readies a LevelDB store for a given raw state given the fixtures + // definitions passed into it. + Preparer interface { + // Prepare furnishes the database and returns its path along with any + // encountered anomalies. + Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory + } + + FixtureFactory interface { + // HasNext indicates whether the FixtureFactory has more pending fixture + // data to build. + HasNext() (has bool) + // Next emits the next (key, value) double for storage. + Next() (key coding.Encoder, value coding.Encoder) + } + + preparer struct { + tester test.Tester + } + + cassetteFactory struct { + index int + count int + pairs Pairs + } +) + +func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { + t = test.NewTemporaryDirectory(n, p.tester) + persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + if err != nil { + defer t.Close() + p.tester.Fatal(err) + } + defer func() { + err = persistence.Close() + if err != nil { + p.tester.Fatal(err) + } + }() + + for f.HasNext() { + key, value := f.Next() + + err = persistence.Put(key, value) + if err != nil { + defer t.Close() + p.tester.Fatal(err) + } + } + + return +} + +func (f cassetteFactory) HasNext() bool { + return f.index < f.count +} + +func (f *cassetteFactory) Next() (key, value coding.Encoder) { + key, value = f.pairs[f.index].Get() + + f.index++ + + return +} + +// NewPreparer creates a new Preparer for use in testing scenarios. +func NewPreparer(t test.Tester) Preparer { + return preparer{t} +} + +// NewCassetteFactory builds a new FixtureFactory that uses Pairs as the basis +// for generated fixture data. +func NewCassetteFactory(pairs Pairs) FixtureFactory { + return &cassetteFactory{ + pairs: pairs, + count: len(pairs), + } +} diff --git a/utility/test/directory.go b/utility/test/directory.go index 537d8ade4..4263fcf4b 100644 --- a/utility/test/directory.go +++ b/utility/test/directory.go @@ -28,33 +28,35 @@ const ( NilCloser = nilCloser(true) ) -type Closer interface { - // Close reaps the underlying directory and its children. The directory - // could be deleted by its users already. - Close() -} +type ( + Closer interface { + // Close reaps the underlying directory and its children. The directory + // could be deleted by its users already. + Close() + } -type nilCloser bool + nilCloser bool + + // TemporaryDirectory models a closeable path for transient POSIX disk + // activities. + TemporaryDirectory interface { + Closer + + // Path returns the underlying path for access. + Path() string + } + + // temporaryDirectory is kept as a private type due to private fields and + // their interactions. + temporaryDirectory struct { + path string + tester Tester + } +) func (c nilCloser) Close() { } -// TemporaryDirectory models a closeable path for transient POSIX disk -// activities. -type TemporaryDirectory interface { - Closer - - // Path returns the underlying path for access. - Path() string -} - -// temporaryDirectory is kept as a private type due to private fields and their -// interactions. -type temporaryDirectory struct { - path string - tester Tester -} - func (t temporaryDirectory) Close() { err := os.RemoveAll(t.path) if err != nil { diff --git a/web/api/query.go b/web/api/query.go index 3021c1a78..5b39e7a24 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -44,7 +44,7 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string) rb.SetContentType(gorest.Text_Plain) } - return ast.EvalToString(exprNode, ×tamp, format) + return ast.EvalToString(exprNode, timestamp, format) } func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string {