Merge branch 'master' of github.com:prometheus/prometheus into feature/navigation

Conflicts:
	web/web.go
This commit is contained in:
Johannes 'fish' Ziemke 2013-04-05 14:16:33 +02:00
commit c108a9978d
30 changed files with 916 additions and 338 deletions

View file

@ -1,3 +1,5 @@
# Prometheus Team
- Johannes Ziemke
- Julius Volz
- Matt T. Proud
- Tobias Schmidt

View file

@ -16,12 +16,14 @@ TEST_ARTIFACTS = prometheus prometheus.build search_index
all: test
test: build
go test ./...
go test ./... $(GO_TEST_FLAGS)
build:
$(MAKE) -C model
$(MAKE) -C web
go build ./...
binary: build
go build -o prometheus.build
clean:
@ -44,4 +46,4 @@ search_index:
documentation: search_index
godoc -http=:6060 -index -index_files='search_index'
.PHONY: advice build clean documentation format search_index test
.PHONY: advice binary build clean documentation format search_index test

View file

@ -28,6 +28,8 @@ export LDFLAGS := $(LDFLAGS) -L$(OVERLAY_ROOT)/lib
export CGO_CFLAGS := $(CFLAGS) -lsnappy
export CGO_LDFLAGS := $(LDFLAGS)
export GO_TEST_FLAGS := "-v"
GO_GET := go get -u -v -x
APT_GET_INSTALL := sudo apt-get install -y
WGET := wget -c

View file

@ -24,12 +24,18 @@ action if some condition is observed to be true.
7. Prometheus Client, Prometheus in Prometheus (https://github.com/prometheus/client_golang).
8. Snappy, a compression library for LevelDB and Levigo (http://code.google.com/p/snappy/).
## Getting started
## Getting Started
For basic help how to get started:
* For Linux users, please consult the Travis CI configuration in _.travis.yml_.
* The source code is periodically indexed: [Prometheus Core](http://godoc.org/github.com/prometheus/prometheus).
* For Linux users, please consult the Travis CI configuration in _.travis.yml_ and _Makefile.TRAVIS_.
* [Getting started on Mac OSX](documentation/guides/getting-started-osx.md)
* All of the core developers are accessible via the [Prometheus Developers Mailinglist](https://groups.google.com/forum/?fromgroups#!forum/prometheus-developers).
## Testing
[![Build Status](https://travis-ci.org/prometheus/prometheus.png)](https://travis-ci.org/prometheus/prometheus)
## License

View file

@ -15,13 +15,14 @@ package coding
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
)
type ProtocolBufferEncoder struct {
type ProtocolBuffer struct {
message proto.Message
}
func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) {
func (p ProtocolBuffer) Encode() (raw []byte, err error) {
raw, err = proto.Marshal(p.message)
// XXX: Adjust legacy users of this to not check for error.
@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) {
return
}
func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder {
return &ProtocolBufferEncoder{
func (p ProtocolBuffer) String() string {
return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message)
}
func NewProtocolBuffer(message proto.Message) *ProtocolBuffer {
return &ProtocolBuffer{
message: message,
}
}

View file

@ -95,7 +95,7 @@ func main() {
case ruleResult := <-ruleResults:
for _, sample := range ruleResult.Samples {
ts.AppendSample(*sample)
ts.AppendSample(sample)
}
}
}

View file

@ -120,6 +120,10 @@ type fingerprint struct {
lastCharacterOfLastLabelValue string
}
func (f fingerprint) String() string {
return f.ToRowKey()
}
func (f fingerprint) ToRowKey() string {
return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter)
}

View file

@ -14,6 +14,7 @@
package model
import (
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"time"
)
@ -24,6 +25,14 @@ type Watermark struct {
time.Time
}
// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given
// Watermark.
func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark {
return &dto.MetricHighWatermark{
Timestamp: proto.Int64(w.Time.Unix()),
}
}
// NewWatermarkFromHighWatermarkDTO builds Watermark from the provided
// dto.MetricHighWatermark object.
func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark {
@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark {
time.Unix(*d.Timestamp, 0),
}
}
// NewWatermarkFromTime builds a new Watermark for the provided time.
func NewWatermarkFromTime(t time.Time) Watermark {
return Watermark{
t,
}
}

View file

@ -27,8 +27,8 @@ import (
// ----------------------------------------------------------------------------
// Raw data value types.
type Vector []*model.Sample
type Matrix []*model.SampleSet
type Vector model.Samples
type Matrix []model.SampleSet
type groupedAggregation struct {
labels model.Metric
@ -97,23 +97,23 @@ type Node interface {
// interface represents the type returned to the parent node.
type ScalarNode interface {
Node
Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue
Eval(timestamp time.Time, view *viewAdapter) model.SampleValue
}
type VectorNode interface {
Node
Eval(timestamp *time.Time, view *viewAdapter) Vector
Eval(timestamp time.Time, view *viewAdapter) Vector
}
type MatrixNode interface {
Node
Eval(timestamp *time.Time, view *viewAdapter) Matrix
EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix
Eval(timestamp time.Time, view *viewAdapter) Matrix
EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix
}
type StringNode interface {
Node
Eval(timestamp *time.Time, view *viewAdapter) string
Eval(timestamp time.Time, view *viewAdapter) string
}
// ----------------------------------------------------------------------------
@ -146,6 +146,8 @@ type (
// Vector literal, i.e. metric name plus labelset.
VectorLiteral struct {
labels model.LabelSet
// Fingerprints are populated from labels at query analysis time.
fingerprints model.Fingerprints
}
// A function of vector return type.
@ -176,6 +178,8 @@ type (
// Matrix literal, i.e. metric name plus labelset and timerange.
MatrixLiteral struct {
labels model.LabelSet
// Fingerprints are populated from labels at query analysis time.
fingerprints model.Fingerprints
interval time.Duration
}
)
@ -223,17 +227,17 @@ func (node MatrixLiteral) Children() []Node { return []Node{} }
func (node StringLiteral) Children() []Node { return []Node{} }
func (node StringFunctionCall) Children() []Node { return node.args }
func (node *ScalarLiteral) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
func (node *ScalarLiteral) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue {
return node.value
}
func (node *ScalarArithExpr) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
func (node *ScalarArithExpr) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue {
lhs := node.lhs.Eval(timestamp, view)
rhs := node.rhs.Eval(timestamp, view)
return evalScalarBinop(node.opType, lhs, rhs)
}
func (node *ScalarFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue {
func (node *ScalarFunctionCall) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue {
return node.function.callFn(timestamp, view, node.args).(model.SampleValue)
}
@ -259,7 +263,7 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector) {
if err != nil {
return
}
return node.Eval(&timestamp, viewAdapter)
return node.Eval(timestamp, viewAdapter)
}
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) {
@ -274,7 +278,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
// TODO implement watchdog timer for long-running queries.
sampleSets := map[string]*model.SampleSet{}
for t := start; t.Before(end); t = t.Add(interval) {
vector := node.Eval(&t, viewAdapter)
vector := node.Eval(t, viewAdapter)
for _, sample := range vector {
samplePair := model.SamplePair{
Value: sample.Value,
@ -293,7 +297,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
}
for _, sampleSet := range sampleSets {
matrix = append(matrix, sampleSet)
matrix = append(matrix, *sampleSet)
}
return
}
@ -308,23 +312,23 @@ func labelIntersection(metric1, metric2 model.Metric) model.Metric {
return intersection
}
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp *time.Time) Vector {
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp time.Time) Vector {
vector := Vector{}
for _, aggregation := range aggregations {
if node.aggrType == AVG {
aggregation.value = aggregation.value / model.SampleValue(aggregation.groupCount)
}
sample := &model.Sample{
sample := model.Sample{
Metric: aggregation.labels,
Value: aggregation.value,
Timestamp: *timestamp,
Timestamp: timestamp,
}
vector = append(vector, sample)
}
return vector
}
func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vector {
func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vector {
vector := node.vector.Eval(timestamp, view)
result := map[string]*groupedAggregation{}
for _, sample := range vector {
@ -357,8 +361,8 @@ func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vec
return node.groupedAggregationsToVector(result, timestamp)
}
func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector {
values, err := view.GetValueAtTime(node.labels, timestamp)
func (node *VectorLiteral) Eval(timestamp time.Time, view *viewAdapter) Vector {
values, err := view.GetValueAtTime(node.fingerprints, timestamp)
if err != nil {
log.Printf("Unable to get vector values")
return Vector{}
@ -366,7 +370,7 @@ func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector
return values
}
func (node *VectorFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) Vector {
func (node *VectorFunctionCall) Eval(timestamp time.Time, view *viewAdapter) Vector {
return node.function.callFn(timestamp, view, node.args).(Vector)
}
@ -510,7 +514,7 @@ func labelsEqual(labels1, labels2 model.Metric) bool {
return true
}
func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vector {
func (node *VectorArithExpr) Eval(timestamp time.Time, view *viewAdapter) Vector {
lhs := node.lhs.Eval(timestamp, view)
result := Vector{}
if node.rhs.Type() == SCALAR {
@ -541,12 +545,12 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vecto
panic("Invalid vector arithmetic expression operands")
}
func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix {
func (node *MatrixLiteral) Eval(timestamp time.Time, view *viewAdapter) Matrix {
interval := &model.Interval{
OldestInclusive: timestamp.Add(-node.interval),
NewestInclusive: *timestamp,
NewestInclusive: timestamp,
}
values, err := view.GetRangeValues(node.labels, interval)
values, err := view.GetRangeValues(node.fingerprints, interval)
if err != nil {
log.Printf("Unable to get values for vector interval")
return Matrix{}
@ -554,12 +558,12 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix
return values
}
func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix {
func (node *MatrixLiteral) EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix {
interval := &model.Interval{
OldestInclusive: timestamp.Add(-node.interval),
NewestInclusive: *timestamp,
NewestInclusive: timestamp,
}
values, err := view.GetBoundaryValues(node.labels, interval)
values, err := view.GetBoundaryValues(node.fingerprints, interval)
if err != nil {
log.Printf("Unable to get boundary values for vector interval")
return Matrix{}
@ -579,11 +583,11 @@ func (matrix Matrix) Swap(i, j int) {
matrix[i], matrix[j] = matrix[j], matrix[i]
}
func (node *StringLiteral) Eval(timestamp *time.Time, view *viewAdapter) string {
func (node *StringLiteral) Eval(timestamp time.Time, view *viewAdapter) string {
return node.str
}
func (node *StringFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) string {
func (node *StringFunctionCall) Eval(timestamp time.Time, view *viewAdapter) string {
return node.function.callFn(timestamp, view, node.args).(string)
}

View file

@ -24,7 +24,7 @@ type Function struct {
name string
argTypes []ExprType
returnType ExprType
callFn func(timestamp *time.Time, view *viewAdapter, args []Node) interface{}
callFn func(timestamp time.Time, view *viewAdapter, args []Node) interface{}
}
func (function *Function) CheckArgTypes(args []Node) error {
@ -63,17 +63,17 @@ func (function *Function) CheckArgTypes(args []Node) error {
}
// === time() ===
func timeImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
func timeImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(time.Now().Unix())
}
// === count(vector VectorNode) ===
func countImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
func countImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view)))
}
// === delta(matrix MatrixNode, isCounter ScalarNode) ===
func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
matrixNode := args[0].(MatrixNode)
isCounter := int(args[1].(ScalarNode).Eval(timestamp, view))
resultVector := Vector{}
@ -98,10 +98,10 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{}
lastValue = currentValue
}
resultValue := lastValue - samples.Values[0].Value + counterCorrection
resultSample := &model.Sample{
resultSample := model.Sample{
Metric: samples.Metric,
Value: resultValue,
Timestamp: *timestamp,
Timestamp: timestamp,
}
resultVector = append(resultVector, resultSample)
}
@ -109,7 +109,7 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{}
}
// === rate(node *MatrixNode) ===
func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
func rateImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
args = append(args, &ScalarLiteral{value: 1})
vector := deltaImpl(timestamp, view, args).(Vector)
@ -124,36 +124,36 @@ func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{}
}
// === sampleVectorImpl() ===
func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} {
func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
return Vector{
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
"instance": "0",
},
Value: 10,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
"instance": "1",
},
Value: 20,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
"instance": "2",
},
Value: 30,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
@ -161,9 +161,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte
"group": "canary",
},
Value: 40,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
@ -171,9 +171,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte
"group": "canary",
},
Value: 40,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
@ -181,9 +181,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte
"group": "mytest",
},
Value: 40,
Timestamp: *timestamp,
Timestamp: timestamp,
},
&model.Sample{
model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "http_requests",
"job": "api-server",
@ -191,7 +191,7 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte
"group": "mytest",
},
Value: 40,
Timestamp: *timestamp,
Timestamp: timestamp,
},
}
}

View file

@ -28,65 +28,94 @@ var queryStorage metric.Storage = nil
type viewAdapter struct {
view metric.View
// TODO: use this.
stalenessPolicy *metric.StalenessPolicy
}
func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) {
var minDelta time.Duration
for _, candidate := range samples {
// Ignore samples outside of staleness policy window.
delta := candidate.Timestamp.Sub(*timestamp)
if delta < 0 {
delta = -delta
// interpolateSamples interpolates a value at a target time between two
// provided sample pairs.
func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *model.SamplePair {
dv := second.Value - first.Value
dt := second.Timestamp.Sub(first.Timestamp)
dDt := dv / model.SampleValue(dt)
offset := model.SampleValue(timestamp.Sub(first.Timestamp))
return &model.SamplePair{
Value: first.Value + (offset * dDt),
Timestamp: timestamp,
}
}
// chooseClosestSample chooses the closest sample of a list of samples
// surrounding a given target time. If samples are found both before and after
// the target time, the sample value is interpolated between these. Otherwise,
// the single closest sample is returned verbatim.
func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp time.Time) (sample *model.SamplePair) {
var closestBefore *model.SamplePair
var closestAfter *model.SamplePair
for _, candidate := range samples {
delta := candidate.Timestamp.Sub(timestamp)
// Samples before target time.
if delta < 0 {
// Ignore samples outside of staleness policy window.
if -delta > v.stalenessPolicy.DeltaAllowance {
continue
}
// Ignore samples that are farther away than what we've seen before.
if closestBefore != nil && candidate.Timestamp.Before(closestBefore.Timestamp) {
continue
}
sample := candidate
closestBefore = &sample
}
// Samples after target time.
if delta >= 0 {
// Ignore samples outside of staleness policy window.
if delta > v.stalenessPolicy.DeltaAllowance {
continue
}
// Skip sample if we've seen a closer one before this.
if sample != nil {
if delta > minDelta {
// Ignore samples that are farther away than samples we've seen before.
if closestAfter != nil && candidate.Timestamp.After(closestAfter.Timestamp) {
continue
}
sample := candidate
closestAfter = &sample
}
}
sample = &candidate
minDelta = delta
switch {
case closestBefore != nil && closestAfter != nil:
sample = interpolateSamples(closestBefore, closestAfter, timestamp)
case closestBefore != nil:
sample = closestBefore
default:
sample = closestAfter
}
return
}
func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) {
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil {
return
}
func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) {
for _, fingerprint := range fingerprints {
sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp)
sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp)
samplePair := v.chooseClosestSample(sampleCandidates, timestamp)
m, err := queryStorage.GetMetricForFingerprint(fingerprint)
if err != nil {
continue
}
if samplePair != nil {
samples = append(samples, &model.Sample{
samples = append(samples, model.Sample{
Metric: *m,
Value: samplePair.Value,
Timestamp: *timestamp,
Timestamp: timestamp,
})
}
}
return
}
func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) {
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil {
return
}
func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {
for _, fingerprint := range fingerprints {
// TODO: change to GetBoundaryValues() once it has the right return type.
samplePairs := v.view.GetRangeValues(fingerprint, *interval)
@ -100,7 +129,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I
continue
}
sampleSet := &model.SampleSet{
sampleSet := model.SampleSet{
Metric: *m,
Values: samplePairs,
}
@ -109,12 +138,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I
return sampleSets, nil
}
func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) {
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels)
if err != nil {
return
}
func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {
for _, fingerprint := range fingerprints {
samplePairs := v.view.GetRangeValues(fingerprint, *interval)
if samplePairs == nil {
@ -127,7 +151,7 @@ func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Inte
continue
}
sampleSet := &model.SampleSet{
sampleSet := model.SampleSet{
Metric: *m,
Values: samplePairs,
}

View file

@ -152,8 +152,8 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
return string(dataJSON)
}
func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string {
viewAdapter, err := viewAdapterForInstantQuery(node, *timestamp)
func EvalToString(node Node, timestamp time.Time, format OutputFormat) string {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp)
if err != nil {
panic(err)
}

View file

@ -66,6 +66,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return
}
n.fingerprints = fingerprints
for _, fingerprint := range fingerprints {
if !analyzer.IntervalRanges[fingerprint] {
analyzer.IntervalRanges[fingerprint] = true
@ -77,6 +78,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return
}
n.fingerprints = fingerprints
for _, fingerprint := range fingerprints {
interval := n.interval
// If an interval has already been recorded for this fingerprint, merge

View file

@ -45,7 +45,7 @@ AVG|SUM|MAX|MIN { yylval.str = yytext; return AGGR_OP }
[*/%] { yylval.str = yytext; return MULT_OP }
{D}+{U} { yylval.str = yytext; return DURATION }
{L}({L}|{D})+ { yylval.str = yytext; return IDENTIFIER }
{L}({L}|{D})* { yylval.str = yytext; return IDENTIFIER }
\-?{D}+(\.{D}*)? { num, err := strconv.ParseFloat(yytext, 32);
if (err != nil && err.(*strconv.NumError).Err == strconv.ErrSyntax) {

View file

@ -411,7 +411,7 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon
return yyactionreturn{DURATION, yyRT_USER_RETURN}
}
return yyactionreturn{0, yyRT_FALLTHROUGH}
}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))+"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) {
}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))*"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) {
defer func() {
if r := recover(); r != nil {
if r != "yyREJECT" {

View file

@ -179,6 +179,13 @@ var expressionTests = []struct {
},
fullRanges: 8,
intervalRanges: 0,
}, {
expr: "x{y='testvalue'}",
output: []string{
"x{y='testvalue'} => 100 @[%v]",
},
fullRanges: 0,
intervalRanges: 1,
// Invalid expressions that should fail to parse.
}, {
expr: "",
@ -241,7 +248,7 @@ func TestExpressions(t *testing.T) {
t.Errorf("Test should fail, but didn't")
}
failed := false
resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT)
resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT)
resultLines := strings.Split(resultStr, "\n")
if len(exprTest.output) != len(resultLines) {

View file

@ -42,7 +42,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
vector := ast.Vector{}
for _, sampleSet := range matrix {
lastSample := sampleSet.Values[len(sampleSet.Values)-1]
vector = append(vector, &model.Sample{
vector = append(vector, model.Sample{
Metric: sampleSet.Metric,
Value: lastSample.Value,
Timestamp: lastSample.Timestamp,
@ -140,6 +140,14 @@ var testMatrix = ast.Matrix{
},
Values: getTestValueStream(0, 800, 80),
},
// Single-letter metric and label names.
{
Metric: model.Metric{
model.MetricNameLabel: "x",
"y": "testvalue",
},
Values: getTestValueStream(0, 100, 10),
},
}
var testVector = getTestVectorFromTestMatrix(testMatrix)

1
storage/metric/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
command-line-arguments.test

View file

@ -37,8 +37,9 @@ type curator struct {
// watermarks is the on-disk store that is scanned for high watermarks for
// given metrics.
watermarks raw.Persistence
// cutOff represents the most recent time up to which values will be curated.
cutOff time.Time
// recencyThreshold represents the most recent time up to which values will be
// curated.
recencyThreshold time.Duration
// groupingQuantity represents the number of samples below which encountered
// samples will be dismembered and reaggregated into larger groups.
groupingQuantity uint32
@ -48,9 +49,9 @@ type curator struct {
}
// newCurator builds a new curator for the given LevelDB databases.
func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
return curator{
cutOff: cutOff,
recencyThreshold: recencyThreshold,
stop: make(chan bool),
samples: samples,
curationState: curationState,
@ -60,18 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample
}
// run facilitates the curation lifecycle.
func (c curator) run() (err error) {
var (
decoder watermarkDecoder
filter = watermarkFilter{
func (c curator) run(instant time.Time) (err error) {
decoder := watermarkDecoder{}
filter := watermarkFilter{
stop: c.stop,
curationState: c.curationState,
groupSize: c.groupingQuantity,
recencyThreshold: c.recencyThreshold,
}
operator = watermarkOperator{
olderThan: c.cutOff,
operator := watermarkOperator{
olderThan: instant.Add(-1 * c.recencyThreshold),
groupSize: c.groupingQuantity,
curationState: c.curationState,
}
)
_, err = c.watermarks.ForEach(decoder, filter, operator)
@ -125,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro
// watermarkFilter determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
type watermarkFilter struct {
// curationState is the table of CurationKey to CurationValues that remark on
// curationState is the table of CurationKey to CurationValues that rema
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
// stop, when non-empty, instructs the filter to stop operation.
stop chan bool
// groupSize refers to the target groupSize from the curator.
groupSize uint32
// recencyThreshold refers to the target recencyThreshold from the curator.
recencyThreshold time.Duration
}
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
curationKey = fingerprint.ToDTO()
rawCurationValue []byte
err error
curationValue = &dto.CurationValue{}
)
fingerprint := key.(model.Fingerprint)
watermark := value.(model.Watermark)
curationKey := &dto.CurationKey{
Fingerprint: fingerprint.ToDTO(),
MinimumGroupSize: proto.Uint32(w.groupSize),
OlderThan: proto.Int64(int64(w.recencyThreshold)),
}
curationValue := &dto.CurationValue{}
rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey))
if err != nil {
panic(err)
}
@ -228,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er
MinimumGroupSize: proto.Uint32(w.groupSize),
}
curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey))
curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey))
return
}
@ -246,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod
}
)
rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey))
if err != nil {
return
}

View file

@ -14,200 +14,510 @@
package metric
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"sort"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"testing"
"time"
)
type (
keyPair struct {
fingerprint model.Fingerprint
time time.Time
curationState struct {
fingerprint string
groupSize int
recencyThreshold time.Duration
lastCurated time.Time
}
fakeCurationStates map[model.Fingerprint]time.Time
fakeSamples map[keyPair][]float32
fakeWatermarks map[model.Fingerprint]time.Time
watermarkState struct {
fingerprint string
lastAppended time.Time
}
sample struct {
time time.Time
value float32
}
sampleGroup struct {
fingerprint string
values []sample
}
in struct {
curationStates fakeCurationStates
samples fakeSamples
watermarks fakeWatermarks
cutOff time.Time
grouping uint32
}
out struct {
curationStates fakeCurationStates
samples fakeSamples
watermarks fakeWatermarks
curationStates fixture.Pairs
watermarkStates fixture.Pairs
sampleGroups fixture.Pairs
recencyThreshold time.Duration
groupSize uint32
}
)
func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c curationState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(&dto.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
OlderThan: proto.Int64(int64(c.recencyThreshold)),
})
func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeCurationStates) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Close() error {
panic("unimplemented")
}
func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
var (
fingerprints model.Fingerprints
)
for f := range c {
fingerprints = append(fingerprints, f)
}
sort.Sort(fingerprints)
for _, k := range fingerprints {
v := c[k]
var (
decodedKey interface{}
decodedValue interface{}
)
decodedKey, err = d.DecodeKey(k)
if err != nil {
continue
}
decodedValue, err = d.DecodeValue(v)
if err != nil {
continue
}
switch f.Filter(decodedKey, decodedValue) {
case storage.STOP:
return
case storage.SKIP:
continue
case storage.ACCEPT:
opErr := o.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {
continue
}
break
}
}
}
value = coding.NewProtocolBuffer(&dto.CurationValue{
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
})
return
}
func (c fakeCurationStates) Commit(_ raw.Batch) error {
panic("unimplemented")
func (w watermarkState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
return
}
func (c fakeSamples) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (s sampleGroup) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(&dto.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
Timestamp: indexable.EncodeTime(s.values[0].time),
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
SampleCount: proto.Uint32(uint32(len(s.values))),
})
func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
series := &dto.SampleValueSeries{}
func (c fakeSamples) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
for _, value := range s.values {
series.Value = append(series.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(value.time.Unix()),
Value: proto.Float32(float32(value.value)),
})
}
func (c fakeSamples) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
value = coding.NewProtocolBuffer(series)
func (c fakeSamples) Close() error {
panic("unimplemented")
}
func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeSamples) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Close() error {
panic("unimplemented")
}
func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
return
}
func TestCurator(t *testing.T) {
var (
scenarios = []struct {
in in
out out
}{
{
in: in{
curationStates: fakeCurationStates{
model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute),
model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute),
recencyThreshold: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
curationState{
fingerprint: "0001-A-1-Z",
groupSize: 5,
recencyThreshold: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
},
curationState{
fingerprint: "0002-A-2-Z",
groupSize: 5,
recencyThreshold: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
// This rule should effectively be ignored.
curationState{
fingerprint: "0002-A-2-Z",
groupSize: 2,
recencyThreshold: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-2-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 55 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 50 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 45 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 40 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 25 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 89 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 88 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 87 * time.Minute),
value: 3,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 86 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 5,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 84 * time.Minute),
value: 6,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 83 * time.Minute),
value: 7,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 82 * time.Minute),
value: 8,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 81 * time.Minute),
value: 9,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 10,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 79 * time.Minute),
value: 11,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 78 * time.Minute),
value: 12,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 77 * time.Minute),
value: 13,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 76 * time.Minute),
value: 14,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 15,
},
{
time: testInstant.Add(-1 * 74 * time.Minute),
value: 16,
},
{
time: testInstant.Add(-1 * 73 * time.Minute),
value: 17,
},
{
time: testInstant.Add(-1 * 72 * time.Minute),
value: 18,
},
{
time: testInstant.Add(-1 * 71 * time.Minute),
value: 19,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 20,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 69 * time.Minute),
value: 21,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 68 * time.Minute),
value: 22,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 67 * time.Minute),
value: 23,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 66 * time.Minute),
value: 24,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 25,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 64 * time.Minute),
value: 26,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 63 * time.Minute),
value: 27,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 62 * time.Minute),
value: 28,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 61 * time.Minute),
value: 29,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 30,
},
},
},
},
watermarks: fakeWatermarks{},
samples: fakeSamples{},
cutOff: testInstant.Add(5 * time.Minute),
grouping: 5,
},
},
}
)
for _, scenario := range scenarios {
var (
in = scenario.in
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
defer curatorDirectory.Close()
curationStates = in.curationStates
samples = in.samples
watermarks = in.watermarks
cutOff = in.cutOff
grouping = in.grouping
)
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
defer watermarkDirectory.Close()
_ = newCurator(cutOff, grouping, curationStates, samples, watermarks)
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer samples.Close()
c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates)
c.run(testInstant)
}
}

View file

@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
Timestamp: upperSeek,
}
raw, err := coding.NewProtocolBufferEncoder(key).Encode()
raw, err := coding.NewProtocolBuffer(key).Encode()
if err != nil {
panic(err)
}
@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
key.Timestamp = lowerSeek
raw, err = coding.NewProtocolBufferEncoder(key).Encode()
raw, err = coding.NewProtocolBuffer(key).Encode()
if err != nil {
panic(err)
}

View file

@ -63,6 +63,8 @@ var (
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
}
curationDuration = metrics.NewCounter()
curationDurations = metrics.NewHistogram(diskLatencyHistogram)
storageOperations = metrics.NewCounter()
storageOperationDurations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram)

View file

@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO())
}
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
}
err = l.labelNameToFingerprints.Commit(batch)
@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO())
}
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
}
err = l.labelSetToFingerprints.Commit(batch)
@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
defer batch.Close()
for fingerprint, metric := range metrics {
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
key := coding.NewProtocolBuffer(fingerprint.ToDTO())
value := coding.NewProtocolBuffer(model.MetricToDTO(metric))
batch.Put(key, value)
}
@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics {
key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
batch.Put(key, key)
}
@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
value = &dto.MetricHighWatermark{}
raw []byte
newestSampleTimestamp = samples[len(samples)-1].Timestamp
keyEncoded = coding.NewProtocolBufferEncoder(key)
keyEncoded = coding.NewProtocolBuffer(key)
)
key.Signature = proto.String(fingerprint.ToRowKey())
@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
}
}
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value))
batch.Put(keyEncoded, coding.NewProtocolBuffer(value))
mutationCount++
}
@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
}
}
@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.metricMembershipIndex.Has(dtoKey)
return
@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.labelSetToFingerprints.Has(dtoKey)
return
@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.labelNameToFingerprints.Has(dtoKey)
return
@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
sets := []utility.Set{}
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO))
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO))
if err != nil {
return fps, err
}
@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}()
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName)))
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
if err != nil {
return
}
@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}()
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f)))
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
if err != nil {
return
}
@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
Timestamp: indexable.EncodeTime(t),
}
e, err := coding.NewProtocolBufferEncoder(k).Encode()
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}
@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e, err := coding.NewProtocolBufferEncoder(k).Encode()
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}

View file

@ -21,7 +21,9 @@ import (
)
var (
testInstant = time.Time{}
// ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20``
usEastern, _ = time.LoadLocation("US/Eastern")
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern)
)
func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) {

View file

@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
}
// Try seeking to target key.
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()
rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode()
iterator.Seek(rawKey)
foundKey, err := extractSampleKey(iterator)
@ -562,7 +562,7 @@ func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fin
if err != nil {
return
}
diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
diskFingerprints, err := t.diskStorage.GetFingerprintsForLabelSet(labelSet)
if err != nil {
return
}

View file

@ -14,7 +14,6 @@
package metric
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"sort"
@ -380,9 +379,7 @@ func testMakeView(t test.Tester) {
}
}
start := time.Now()
tiered.Flush()
fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data))
requestBuilder := NewViewRequestBuilder()
@ -530,3 +527,64 @@ func TestGetAllValuesForLabel(t *testing.T) {
}
}
}
func TestGetFingerprintsForLabelSet(t *testing.T) {
tiered, closer := newTestTieredStorage(t)
defer closer.Close()
memorySample := model.Sample{
Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/foo"},
}
diskSample := model.Sample{
Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/bar"},
}
if err := tiered.(*tieredStorage).memoryArena.AppendSample(memorySample); err != nil {
t.Fatalf("Failed to add fixture data: %s", err)
}
if err := tiered.(*tieredStorage).diskStorage.AppendSample(diskSample); err != nil {
t.Fatalf("Failed to add fixture data: %s", err)
}
tiered.Flush()
scenarios := []struct {
labels model.LabelSet
fpCount int
}{
{
labels: model.LabelSet{},
fpCount: 0,
}, {
labels: model.LabelSet{
model.MetricNameLabel: "http_requests",
},
fpCount: 2,
}, {
labels: model.LabelSet{
model.MetricNameLabel: "http_requests",
"method": "/foo",
},
fpCount: 1,
}, {
labels: model.LabelSet{
model.MetricNameLabel: "http_requests",
"method": "/bar",
},
fpCount: 1,
}, {
labels: model.LabelSet{
model.MetricNameLabel: "http_requests",
"method": "/baz",
},
fpCount: 0,
},
}
for i, scenario := range scenarios {
fingerprints, err := tiered.GetFingerprintsForLabelSet(scenario.labels)
if err != nil {
t.Fatalf("%d. Error getting metric names: %s", i, err)
}
if len(fingerprints) != scenario.fpCount {
t.Fatalf("%d. Expected metric count %d, got %d", i, scenario.fpCount, len(fingerprints))
}
}
}

View file

@ -21,7 +21,7 @@ import (
)
var (
existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{})
existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{})
)
type LevelDBMembershipIndex struct {

View file

@ -0,0 +1,115 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility/test"
)
const (
cacheCapacity = 0
bitsPerBloomFilterEncoded = 0
)
type (
// Pair models a prospective (key, value) double that will be committed to
// a database.
Pair interface {
Get() (key, value coding.Encoder)
}
// Pairs models a list of Pair for disk committing.
Pairs []Pair
// Preparer readies a LevelDB store for a given raw state given the fixtures
// definitions passed into it.
Preparer interface {
// Prepare furnishes the database and returns its path along with any
// encountered anomalies.
Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory
}
FixtureFactory interface {
// HasNext indicates whether the FixtureFactory has more pending fixture
// data to build.
HasNext() (has bool)
// Next emits the next (key, value) double for storage.
Next() (key coding.Encoder, value coding.Encoder)
}
preparer struct {
tester test.Tester
}
cassetteFactory struct {
index int
count int
pairs Pairs
}
)
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
t = test.NewTemporaryDirectory(n, p.tester)
persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded)
if err != nil {
defer t.Close()
p.tester.Fatal(err)
}
defer func() {
err = persistence.Close()
if err != nil {
p.tester.Fatal(err)
}
}()
for f.HasNext() {
key, value := f.Next()
err = persistence.Put(key, value)
if err != nil {
defer t.Close()
p.tester.Fatal(err)
}
}
return
}
func (f cassetteFactory) HasNext() bool {
return f.index < f.count
}
func (f *cassetteFactory) Next() (key, value coding.Encoder) {
key, value = f.pairs[f.index].Get()
f.index++
return
}
// NewPreparer creates a new Preparer for use in testing scenarios.
func NewPreparer(t test.Tester) Preparer {
return preparer{t}
}
// NewCassetteFactory builds a new FixtureFactory that uses Pairs as the basis
// for generated fixture data.
func NewCassetteFactory(pairs Pairs) FixtureFactory {
return &cassetteFactory{
pairs: pairs,
count: len(pairs),
}
}

View file

@ -28,31 +28,33 @@ const (
NilCloser = nilCloser(true)
)
type Closer interface {
type (
Closer interface {
// Close reaps the underlying directory and its children. The directory
// could be deleted by its users already.
Close()
}
}
type nilCloser bool
nilCloser bool
func (c nilCloser) Close() {
}
// TemporaryDirectory models a closeable path for transient POSIX disk
// activities.
type TemporaryDirectory interface {
// TemporaryDirectory models a closeable path for transient POSIX disk
// activities.
TemporaryDirectory interface {
Closer
// Path returns the underlying path for access.
Path() string
}
}
// temporaryDirectory is kept as a private type due to private fields and their
// interactions.
type temporaryDirectory struct {
// temporaryDirectory is kept as a private type due to private fields and
// their interactions.
temporaryDirectory struct {
path string
tester Tester
}
)
func (c nilCloser) Close() {
}
func (t temporaryDirectory) Close() {

View file

@ -44,7 +44,7 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string)
rb.SetContentType(gorest.Text_Plain)
}
return ast.EvalToString(exprNode, &timestamp, format)
return ast.EvalToString(exprNode, timestamp, format)
}
func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string {