From b836066c71938646df84e17b3596adc12d06facd Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 27 Mar 2013 14:06:30 +0100 Subject: [PATCH 01/16] Eliminate need to get fingerprints during query execution time. --- rules/ast/ast.go | 10 +++++++--- rules/ast/persistence_adapter.go | 21 +++------------------ rules/ast/query_analyzer.go | 2 ++ 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/rules/ast/ast.go b/rules/ast/ast.go index a0e7c66e38..95f82c42c0 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -146,6 +146,8 @@ type ( // Vector literal, i.e. metric name plus labelset. VectorLiteral struct { labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints } // A function of vector return type. @@ -176,6 +178,8 @@ type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints interval time.Duration } ) @@ -358,7 +362,7 @@ func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vec } func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector { - values, err := view.GetValueAtTime(node.labels, timestamp) + values, err := view.GetValueAtTime(node.fingerprints, timestamp) if err != nil { log.Printf("Unable to get vector values") return Vector{} @@ -546,7 +550,7 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := view.GetRangeValues(node.labels, interval) + values, err := view.GetRangeValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get values for vector interval") return Matrix{} @@ -559,7 +563,7 @@ func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapte OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := view.GetBoundaryValues(node.labels, interval) + values, err := view.GetBoundaryValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get boundary values for vector interval") return Matrix{} diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 9fa82c6757..7b136b32ca 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -57,12 +57,7 @@ func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp return } -func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp *time.Time) (samples []*model.Sample, err error) { for _, fingerprint := range fingerprints { sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp) @@ -81,12 +76,7 @@ func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time return } -func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { for _, fingerprint := range fingerprints { // TODO: change to GetBoundaryValues() once it has the right return type. samplePairs := v.view.GetRangeValues(fingerprint, *interval) @@ -109,12 +99,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I return sampleSets, nil } -func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { for _, fingerprint := range fingerprints { samplePairs := v.view.GetRangeValues(fingerprint, *interval) if samplePairs == nil { diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 17578509d6..2c6ec92f86 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -66,6 +66,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { if !analyzer.IntervalRanges[fingerprint] { analyzer.IntervalRanges[fingerprint] = true @@ -77,6 +78,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { interval := n.interval // If an interval has already been recorded for this fingerprint, merge From 6dcaa2880693b228ceaa44b245ba64f160d0e86a Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 27 Mar 2013 12:52:08 +0100 Subject: [PATCH 02/16] Include LevelDB fixture generators for curator. This will help reduce common boilerplate for our test process with respect to LevelDB-related things. --- storage/raw/leveldb/test/fixtures.go | 121 +++++++++++++++++++++++++++ utility/test/directory.go | 46 +++++----- 2 files changed, 145 insertions(+), 22 deletions(-) create mode 100644 storage/raw/leveldb/test/fixtures.go diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go new file mode 100644 index 0000000000..81346f938c --- /dev/null +++ b/storage/raw/leveldb/test/fixtures.go @@ -0,0 +1,121 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/storage/raw" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility/test" +) + +const ( + cacheCapacity = 0 + bitsPerBloomFilterEncoded = 0 +) + +type ( + // Pair models a prospective (key, value) double that will be committed to + // a database. + Pair interface { + Get() (key, value coding.Encoder) + } + + // Pairs models a list of Pair for disk committing. + Pairs []Pair + + // Preparer readies a LevelDB store for a given raw state given the fixtures + // definitions passed into it. + Preparer interface { + // Prepare furnishes the database and returns its path along with any + // encountered anomalies. + Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory + } + + FixtureFactory interface { + // HasNext indicates whether the FixtureFactory has more pending fixture + // data to build. + HasNext() (has bool) + // Next emits the next (key, value) double for storage. + Next() (key coding.Encoder, value coding.Encoder) + } + + preparer struct { + tester test.Tester + } + + cassetteFactory struct { + index int + count int + pairs Pairs + } +) + +func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { + t = test.NewTemporaryDirectory(n, p.tester) + + var ( + persistence raw.Persistence + err error + ) + + persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + if err != nil { + defer t.Close() + p.tester.Fatal(err) + } + + for f.HasNext() { + var ( + key coding.Encoder + value coding.Encoder + ) + + key, value = f.Next() + + err = persistence.Put(key, value) + if err != nil { + defer t.Close() + p.tester.Fatal(err) + } + } + + return +} + +func (f cassetteFactory) HasNext() bool { + return f.index < f.count +} + +func (f *cassetteFactory) Next() (key, value coding.Encoder) { + key, value = f.pairs[f.index].Get() + + f.index++ + + return +} + +// NewPreparer creates a new Preparer for use in testing scenarios. +func NewPreparer(t test.Tester) Preparer { + return preparer{t} +} + +// NewCassetteFactory builds a new FixtureFactory that uses Pairs as the basis +// for generated fixture data. +func NewCassetteFactory(pairs Pairs) FixtureFactory { + return &cassetteFactory{ + pairs: pairs, + count: len(pairs), + } +} diff --git a/utility/test/directory.go b/utility/test/directory.go index 537d8ade42..4263fcf4bf 100644 --- a/utility/test/directory.go +++ b/utility/test/directory.go @@ -28,33 +28,35 @@ const ( NilCloser = nilCloser(true) ) -type Closer interface { - // Close reaps the underlying directory and its children. The directory - // could be deleted by its users already. - Close() -} +type ( + Closer interface { + // Close reaps the underlying directory and its children. The directory + // could be deleted by its users already. + Close() + } -type nilCloser bool + nilCloser bool + + // TemporaryDirectory models a closeable path for transient POSIX disk + // activities. + TemporaryDirectory interface { + Closer + + // Path returns the underlying path for access. + Path() string + } + + // temporaryDirectory is kept as a private type due to private fields and + // their interactions. + temporaryDirectory struct { + path string + tester Tester + } +) func (c nilCloser) Close() { } -// TemporaryDirectory models a closeable path for transient POSIX disk -// activities. -type TemporaryDirectory interface { - Closer - - // Path returns the underlying path for access. - Path() string -} - -// temporaryDirectory is kept as a private type due to private fields and their -// interactions. -type temporaryDirectory struct { - path string - tester Tester -} - func (t temporaryDirectory) Close() { err := os.RemoveAll(t.path) if err != nil { From 542bb6748e322333cdefcd6ea9b797a77c495137 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 27 Mar 2013 14:06:30 +0100 Subject: [PATCH 03/16] Eliminate need to get fingerprints during query execution time. --- rules/ast/ast.go | 10 +++++++--- rules/ast/persistence_adapter.go | 21 +++------------------ rules/ast/query_analyzer.go | 2 ++ 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/rules/ast/ast.go b/rules/ast/ast.go index a0e7c66e38..95f82c42c0 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -146,6 +146,8 @@ type ( // Vector literal, i.e. metric name plus labelset. VectorLiteral struct { labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints } // A function of vector return type. @@ -176,6 +178,8 @@ type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { labels model.LabelSet + // Fingerprints are populated from labels at query analysis time. + fingerprints model.Fingerprints interval time.Duration } ) @@ -358,7 +362,7 @@ func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vec } func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector { - values, err := view.GetValueAtTime(node.labels, timestamp) + values, err := view.GetValueAtTime(node.fingerprints, timestamp) if err != nil { log.Printf("Unable to get vector values") return Vector{} @@ -546,7 +550,7 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := view.GetRangeValues(node.labels, interval) + values, err := view.GetRangeValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get values for vector interval") return Matrix{} @@ -559,7 +563,7 @@ func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapte OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := view.GetBoundaryValues(node.labels, interval) + values, err := view.GetBoundaryValues(node.fingerprints, interval) if err != nil { log.Printf("Unable to get boundary values for vector interval") return Matrix{} diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 9fa82c6757..7b136b32ca 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -57,12 +57,7 @@ func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp return } -func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp *time.Time) (samples []*model.Sample, err error) { for _, fingerprint := range fingerprints { sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp) @@ -81,12 +76,7 @@ func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time return } -func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { for _, fingerprint := range fingerprints { // TODO: change to GetBoundaryValues() once it has the right return type. samplePairs := v.view.GetRangeValues(fingerprint, *interval) @@ -109,12 +99,7 @@ func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.I return sampleSets, nil } -func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - +func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { for _, fingerprint := range fingerprints { samplePairs := v.view.GetRangeValues(fingerprint, *interval) if samplePairs == nil { diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 17578509d6..2c6ec92f86 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -66,6 +66,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { if !analyzer.IntervalRanges[fingerprint] { analyzer.IntervalRanges[fingerprint] = true @@ -77,6 +78,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return } + n.fingerprints = fingerprints for _, fingerprint := range fingerprints { interval := n.interval // If an interval has already been recorded for this fingerprint, merge From 8e15a4282f315ec4cf8c565ed4eaba675b64c8a2 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 26 Mar 2013 12:33:48 +0100 Subject: [PATCH 04/16] Test data for the curator. --- Makefile | 4 +- model/fingerprinting.go | 4 + model/watermark.go | 16 + rules/ast/ast.go | 4 +- storage/metric/curator.go | 3 +- storage/metric/curator_test.go | 584 ++++++++++++++++++++------- storage/metric/instrumentation.go | 2 + storage/metric/test_helper.go | 4 +- storage/metric/tiered_test.go | 3 - storage/raw/leveldb/test/fixtures.go | 6 + 10 files changed, 469 insertions(+), 161 deletions(-) diff --git a/Makefile b/Makefile index 9b9582be71..e634862405 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,8 @@ build: $(MAKE) -C model $(MAKE) -C web go build ./... + +binary: build go build -o prometheus.build clean: @@ -44,4 +46,4 @@ search_index: documentation: search_index godoc -http=:6060 -index -index_files='search_index' -.PHONY: advice build clean documentation format search_index test +.PHONY: advice binary build clean documentation format search_index test diff --git a/model/fingerprinting.go b/model/fingerprinting.go index 6dd1257f95..efdd64fa8f 100644 --- a/model/fingerprinting.go +++ b/model/fingerprinting.go @@ -120,6 +120,10 @@ type fingerprint struct { lastCharacterOfLastLabelValue string } +func (f fingerprint) String() string { + return f.ToRowKey() +} + func (f fingerprint) ToRowKey() string { return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter) } diff --git a/model/watermark.go b/model/watermark.go index 93cf47c91d..e6126bd419 100644 --- a/model/watermark.go +++ b/model/watermark.go @@ -14,6 +14,7 @@ package model import ( + "code.google.com/p/goprotobuf/proto" dto "github.com/prometheus/prometheus/model/generated" "time" ) @@ -24,6 +25,14 @@ type Watermark struct { time.Time } +// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given +// Watermark. +func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark { + return &dto.MetricHighWatermark{ + Timestamp: proto.Int64(w.Time.Unix()), + } +} + // NewWatermarkFromHighWatermarkDTO builds Watermark from the provided // dto.MetricHighWatermark object. func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { @@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { time.Unix(*d.Timestamp, 0), } } + +// NewWatermarkFromTime builds a new Watermark for the provided time. +func NewWatermarkFromTime(t time.Time) Watermark { + return Watermark{ + t, + } +} diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 95f82c42c0..3be9c49da7 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -177,10 +177,10 @@ type ( type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { - labels model.LabelSet + labels model.LabelSet // Fingerprints are populated from labels at query analysis time. fingerprints model.Fingerprints - interval time.Duration + interval time.Duration } ) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index fb3d64646c..62918dc430 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -64,7 +64,8 @@ func (c curator) run() (err error) { var ( decoder watermarkDecoder filter = watermarkFilter{ - stop: c.stop, + stop: c.stop, + curationState: c.curationState, } operator = watermarkOperator{ olderThan: c.cutOff, diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 5adb75c2ee..3cce655582 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -14,184 +14,464 @@ package metric import ( + "code.google.com/p/goprotobuf/proto" "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/raw" - "sort" + dto "github.com/prometheus/prometheus/model/generated" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" ) type ( - keyPair struct { - fingerprint model.Fingerprint - time time.Time + curationState struct { + fingerprint string + groupSize int + olderThan time.Duration + lastCurated time.Time } - fakeCurationStates map[model.Fingerprint]time.Time - fakeSamples map[keyPair][]float32 - fakeWatermarks map[model.Fingerprint]time.Time - - in struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks - cutOff time.Time - grouping uint32 + watermarkState struct { + fingerprint string + lastAppended time.Time } - out struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks + sample struct { + time time.Time + value float32 + } + + sampleGroup struct { + fingerprint string + values []sample + } + + context struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs } ) -func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (c curationState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), + MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), + OlderThan: proto.Int64(int64(c.olderThan)), + }) -func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeCurationStates) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Close() error { - panic("unimplemented") -} - -func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - var ( - fingerprints model.Fingerprints - ) - - for f := range c { - fingerprints = append(fingerprints, f) - } - - sort.Sort(fingerprints) - - for _, k := range fingerprints { - v := c[k] - - var ( - decodedKey interface{} - decodedValue interface{} - ) - - decodedKey, err = d.DecodeKey(k) - if err != nil { - continue - } - - decodedValue, err = d.DecodeValue(v) - if err != nil { - continue - } - - switch f.Filter(decodedKey, decodedValue) { - case storage.STOP: - return - case storage.SKIP: - continue - case storage.ACCEPT: - opErr := o.Operate(decodedKey, decodedValue) - if opErr != nil { - if opErr.Continuable { - continue - } - break - } - } - } + value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), + }) return } -func (c fakeCurationStates) Commit(_ raw.Batch) error { - panic("unimplemented") +func (w watermarkState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + return } -func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (s sampleGroup) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), + Timestamp: indexable.EncodeTime(s.values[0].time), + LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), + SampleCount: proto.Uint32(uint32(len(s.values))), + }) -func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} + series := &dto.SampleValueSeries{} -func (c fakeSamples) Drop(_ coding.Encoder) error { - panic("unimplemented") -} + for _, value := range s.values { + series.Value = append(series.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(value.time.Unix()), + Value: proto.Float32(float32(value.value)), + }) + } -func (c fakeSamples) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} + value = coding.NewProtocolBufferEncoder(series) -func (c fakeSamples) Close() error { - panic("unimplemented") -} - -func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeSamples) Commit(_ raw.Batch) (err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Close() error { - panic("unimplemented") -} - -func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Commit(_ raw.Batch) (err error) { - panic("unimplemented") + return } func TestCurator(t *testing.T) { var ( scenarios = []struct { - in in - out out + context context }{ { - in: in{ - curationStates: fakeCurationStates{ - model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), - model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), + context: context{ + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + }, + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 55 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 50 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 45 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 40 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 25 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 89 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 88 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 87 * time.Minute), + value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 86 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 84 * time.Minute), + value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 83 * time.Minute), + value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 82 * time.Minute), + value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 81 * time.Minute), + value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 79 * time.Minute), + value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 78 * time.Minute), + value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 77 * time.Minute), + value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 76 * time.Minute), + value: 14, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 15, + }, + { + time: testInstant.Add(-1 * 74 * time.Minute), + value: 16, + }, + { + time: testInstant.Add(-1 * 73 * time.Minute), + value: 17, + }, + { + time: testInstant.Add(-1 * 72 * time.Minute), + value: 18, + }, + { + time: testInstant.Add(-1 * 71 * time.Minute), + value: 19, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 69 * time.Minute), + value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 68 * time.Minute), + value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 67 * time.Minute), + value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 66 * time.Minute), + value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 64 * time.Minute), + value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 63 * time.Minute), + value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 62 * time.Minute), + value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 61 * time.Minute), + value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 30, + }, + }, + }, }, - watermarks: fakeWatermarks{}, - samples: fakeSamples{}, - cutOff: testInstant.Add(5 * time.Minute), - grouping: 5, }, }, } @@ -199,15 +479,13 @@ func TestCurator(t *testing.T) { for _, scenario := range scenarios { var ( - in = scenario.in - - curationStates = in.curationStates - samples = in.samples - watermarks = in.watermarks - cutOff = in.cutOff - grouping = in.grouping + curatorDirectory = fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + watermarkDirectory = fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + sampleDirectory = fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) ) + defer curatorDirectory.Close() + defer watermarkDirectory.Close() + defer sampleDirectory.Close() - _ = newCurator(cutOff, grouping, curationStates, samples, watermarks) } } diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 42979a6a49..cb3381fb03 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -63,6 +63,8 @@ var ( ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, } + curationDuration = metrics.NewCounter() + curationDurations = metrics.NewHistogram(diskLatencyHistogram) storageOperations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter() storageLatency = metrics.NewHistogram(diskLatencyHistogram) diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 959f3f7845..c4c08a6680 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -21,7 +21,9 @@ import ( ) var ( - testInstant = time.Time{} + // ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20`` + usEastern, _ = time.LoadLocation("US/Eastern") + testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern) ) func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ed01cab085..1eccb772bb 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -14,7 +14,6 @@ package metric import ( - "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" "sort" @@ -380,9 +379,7 @@ func testMakeView(t test.Tester) { } } - start := time.Now() tiered.Flush() - fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data)) requestBuilder := NewViewRequestBuilder() diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 81346f938c..0499b8daca 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -75,6 +75,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory defer t.Close() p.tester.Fatal(err) } + defer func() { + err = persistence.Close() + if err != nil { + p.tester.Fatal(err) + } + }() for f.HasNext() { var ( From c53a72a8944d8b6134eed87b17a8e2677c846c88 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 26 Mar 2013 12:33:48 +0100 Subject: [PATCH 05/16] Test data for the curator. --- Makefile | 4 +- model/fingerprinting.go | 4 + model/watermark.go | 16 + rules/ast/ast.go | 4 +- storage/metric/curator.go | 3 +- storage/metric/curator_test.go | 584 ++++++++++++++++++++------- storage/metric/instrumentation.go | 2 + storage/metric/test_helper.go | 4 +- storage/metric/tiered_test.go | 3 - storage/raw/leveldb/test/fixtures.go | 6 + 10 files changed, 469 insertions(+), 161 deletions(-) diff --git a/Makefile b/Makefile index 9b9582be71..e634862405 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,8 @@ build: $(MAKE) -C model $(MAKE) -C web go build ./... + +binary: build go build -o prometheus.build clean: @@ -44,4 +46,4 @@ search_index: documentation: search_index godoc -http=:6060 -index -index_files='search_index' -.PHONY: advice build clean documentation format search_index test +.PHONY: advice binary build clean documentation format search_index test diff --git a/model/fingerprinting.go b/model/fingerprinting.go index 6dd1257f95..efdd64fa8f 100644 --- a/model/fingerprinting.go +++ b/model/fingerprinting.go @@ -120,6 +120,10 @@ type fingerprint struct { lastCharacterOfLastLabelValue string } +func (f fingerprint) String() string { + return f.ToRowKey() +} + func (f fingerprint) ToRowKey() string { return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter) } diff --git a/model/watermark.go b/model/watermark.go index 93cf47c91d..e6126bd419 100644 --- a/model/watermark.go +++ b/model/watermark.go @@ -14,6 +14,7 @@ package model import ( + "code.google.com/p/goprotobuf/proto" dto "github.com/prometheus/prometheus/model/generated" "time" ) @@ -24,6 +25,14 @@ type Watermark struct { time.Time } +// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given +// Watermark. +func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark { + return &dto.MetricHighWatermark{ + Timestamp: proto.Int64(w.Time.Unix()), + } +} + // NewWatermarkFromHighWatermarkDTO builds Watermark from the provided // dto.MetricHighWatermark object. func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { @@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { time.Unix(*d.Timestamp, 0), } } + +// NewWatermarkFromTime builds a new Watermark for the provided time. +func NewWatermarkFromTime(t time.Time) Watermark { + return Watermark{ + t, + } +} diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 95f82c42c0..3be9c49da7 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -177,10 +177,10 @@ type ( type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { - labels model.LabelSet + labels model.LabelSet // Fingerprints are populated from labels at query analysis time. fingerprints model.Fingerprints - interval time.Duration + interval time.Duration } ) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index fb3d64646c..62918dc430 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -64,7 +64,8 @@ func (c curator) run() (err error) { var ( decoder watermarkDecoder filter = watermarkFilter{ - stop: c.stop, + stop: c.stop, + curationState: c.curationState, } operator = watermarkOperator{ olderThan: c.cutOff, diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 5adb75c2ee..3cce655582 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -14,184 +14,464 @@ package metric import ( + "code.google.com/p/goprotobuf/proto" "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/raw" - "sort" + dto "github.com/prometheus/prometheus/model/generated" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" ) type ( - keyPair struct { - fingerprint model.Fingerprint - time time.Time + curationState struct { + fingerprint string + groupSize int + olderThan time.Duration + lastCurated time.Time } - fakeCurationStates map[model.Fingerprint]time.Time - fakeSamples map[keyPair][]float32 - fakeWatermarks map[model.Fingerprint]time.Time - - in struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks - cutOff time.Time - grouping uint32 + watermarkState struct { + fingerprint string + lastAppended time.Time } - out struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks + sample struct { + time time.Time + value float32 + } + + sampleGroup struct { + fingerprint string + values []sample + } + + context struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs } ) -func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (c curationState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), + MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), + OlderThan: proto.Int64(int64(c.olderThan)), + }) -func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeCurationStates) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Close() error { - panic("unimplemented") -} - -func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - var ( - fingerprints model.Fingerprints - ) - - for f := range c { - fingerprints = append(fingerprints, f) - } - - sort.Sort(fingerprints) - - for _, k := range fingerprints { - v := c[k] - - var ( - decodedKey interface{} - decodedValue interface{} - ) - - decodedKey, err = d.DecodeKey(k) - if err != nil { - continue - } - - decodedValue, err = d.DecodeValue(v) - if err != nil { - continue - } - - switch f.Filter(decodedKey, decodedValue) { - case storage.STOP: - return - case storage.SKIP: - continue - case storage.ACCEPT: - opErr := o.Operate(decodedKey, decodedValue) - if opErr != nil { - if opErr.Continuable { - continue - } - break - } - } - } + value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), + }) return } -func (c fakeCurationStates) Commit(_ raw.Batch) error { - panic("unimplemented") +func (w watermarkState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + return } -func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (s sampleGroup) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), + Timestamp: indexable.EncodeTime(s.values[0].time), + LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), + SampleCount: proto.Uint32(uint32(len(s.values))), + }) -func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} + series := &dto.SampleValueSeries{} -func (c fakeSamples) Drop(_ coding.Encoder) error { - panic("unimplemented") -} + for _, value := range s.values { + series.Value = append(series.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(value.time.Unix()), + Value: proto.Float32(float32(value.value)), + }) + } -func (c fakeSamples) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} + value = coding.NewProtocolBufferEncoder(series) -func (c fakeSamples) Close() error { - panic("unimplemented") -} - -func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeSamples) Commit(_ raw.Batch) (err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Close() error { - panic("unimplemented") -} - -func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Commit(_ raw.Batch) (err error) { - panic("unimplemented") + return } func TestCurator(t *testing.T) { var ( scenarios = []struct { - in in - out out + context context }{ { - in: in{ - curationStates: fakeCurationStates{ - model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), - model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), + context: context{ + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + }, + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 55 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 50 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 45 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 40 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 25 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 89 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 88 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 87 * time.Minute), + value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 86 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 84 * time.Minute), + value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 83 * time.Minute), + value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 82 * time.Minute), + value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 81 * time.Minute), + value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 79 * time.Minute), + value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 78 * time.Minute), + value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 77 * time.Minute), + value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 76 * time.Minute), + value: 14, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 15, + }, + { + time: testInstant.Add(-1 * 74 * time.Minute), + value: 16, + }, + { + time: testInstant.Add(-1 * 73 * time.Minute), + value: 17, + }, + { + time: testInstant.Add(-1 * 72 * time.Minute), + value: 18, + }, + { + time: testInstant.Add(-1 * 71 * time.Minute), + value: 19, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 69 * time.Minute), + value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 68 * time.Minute), + value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 67 * time.Minute), + value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 66 * time.Minute), + value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 64 * time.Minute), + value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 63 * time.Minute), + value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 62 * time.Minute), + value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 61 * time.Minute), + value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 30, + }, + }, + }, }, - watermarks: fakeWatermarks{}, - samples: fakeSamples{}, - cutOff: testInstant.Add(5 * time.Minute), - grouping: 5, }, }, } @@ -199,15 +479,13 @@ func TestCurator(t *testing.T) { for _, scenario := range scenarios { var ( - in = scenario.in - - curationStates = in.curationStates - samples = in.samples - watermarks = in.watermarks - cutOff = in.cutOff - grouping = in.grouping + curatorDirectory = fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + watermarkDirectory = fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + sampleDirectory = fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) ) + defer curatorDirectory.Close() + defer watermarkDirectory.Close() + defer sampleDirectory.Close() - _ = newCurator(cutOff, grouping, curationStates, samples, watermarks) } } diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 42979a6a49..cb3381fb03 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -63,6 +63,8 @@ var ( ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, } + curationDuration = metrics.NewCounter() + curationDurations = metrics.NewHistogram(diskLatencyHistogram) storageOperations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter() storageLatency = metrics.NewHistogram(diskLatencyHistogram) diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 959f3f7845..c4c08a6680 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -21,7 +21,9 @@ import ( ) var ( - testInstant = time.Time{} + // ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20`` + usEastern, _ = time.LoadLocation("US/Eastern") + testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern) ) func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ed01cab085..1eccb772bb 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -14,7 +14,6 @@ package metric import ( - "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" "sort" @@ -380,9 +379,7 @@ func testMakeView(t test.Tester) { } } - start := time.Now() tiered.Flush() - fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data)) requestBuilder := NewViewRequestBuilder() diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 81346f938c..0499b8daca 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -75,6 +75,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory defer t.Close() p.tester.Fatal(err) } + defer func() { + err = persistence.Close() + if err != nil { + p.tester.Fatal(err) + } + }() for f.HasNext() { var ( From 2668700e549e7e607a0d66a88987919b7214e0aa Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 27 Mar 2013 18:50:30 +0100 Subject: [PATCH 06/16] Fix bug in GetFingerprintsForLabelSet(). --- storage/metric/tiered.go | 2 +- storage/metric/tiered_test.go | 61 +++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c8dc40d1e3..eabcc99bd6 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -562,7 +562,7 @@ func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fin if err != nil { return } - diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) + diskFingerprints, err := t.diskStorage.GetFingerprintsForLabelSet(labelSet) if err != nil { return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ed01cab085..5293a75b88 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -530,3 +530,64 @@ func TestGetAllValuesForLabel(t *testing.T) { } } } + +func TestGetFingerprintsForLabelSet(t *testing.T) { + tiered, closer := newTestTieredStorage(t) + defer closer.Close() + memorySample := model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/foo"}, + } + diskSample := model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "http_requests", "method": "/bar"}, + } + if err := tiered.(*tieredStorage).memoryArena.AppendSample(memorySample); err != nil { + t.Fatalf("Failed to add fixture data: %s", err) + } + if err := tiered.(*tieredStorage).diskStorage.AppendSample(diskSample); err != nil { + t.Fatalf("Failed to add fixture data: %s", err) + } + tiered.Flush() + + scenarios := []struct { + labels model.LabelSet + fpCount int + }{ + { + labels: model.LabelSet{}, + fpCount: 0, + },{ + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + }, + fpCount: 2, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/foo", + }, + fpCount: 1, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/bar", + }, + fpCount: 1, + }, { + labels: model.LabelSet{ + model.MetricNameLabel: "http_requests", + "method": "/baz", + }, + fpCount: 0, + }, + } + + for i, scenario := range scenarios { + fingerprints, err := tiered.GetFingerprintsForLabelSet(scenario.labels) + if err != nil { + t.Fatalf("%d. Error getting metric names: %s", i, err) + } + if len(fingerprints) != scenario.fpCount { + t.Fatalf("%d. Expected metric count %d, got %d", i, scenario.fpCount, len(fingerprints)) + } + } +} From a9640c0203c153987ef8a84fea8023d48b972113 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 28 Mar 2013 10:38:26 +0100 Subject: [PATCH 07/16] Update community documentation. Include mailing list notes and additional contributors. Include generated documentation. Correct title case. --- CONTRIBUTORS.md | 2 ++ README.md | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index fc5cbd19c5..ebcdb45bb2 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,5 @@ # Prometheus Team +- Johannes Ziemke - Julius Volz - Matt T. Proud +- Tobias Schmidt diff --git a/README.md b/README.md index 8f1ace54a5..b29028f8e7 100644 --- a/README.md +++ b/README.md @@ -24,12 +24,14 @@ action if some condition is observed to be true. 7. Prometheus Client, Prometheus in Prometheus (https://github.com/prometheus/client_golang). 8. Snappy, a compression library for LevelDB and Levigo (http://code.google.com/p/snappy/). -## Getting started +## Getting Started For basic help how to get started: - * For Linux users, please consult the Travis CI configuration in _.travis.yml_. + * The source code is periodically indexed: [Prometheus Core](http://godoc.org/github.com/prometheus/prometheus). + * For Linux users, please consult the Travis CI configuration in _.travis.yml_ and _Makefile.TRAVIS_. * [Getting started on Mac OSX](documentation/guides/getting-started-osx.md) + * All of the core developers are accessible via the [Prometheus Developers Mailinglist](https://groups.google.com/forum/?fromgroups#!forum/prometheus-developers). ## License From c59f3fc5387483acccdb851edf3358c7a62c343a Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 28 Mar 2013 12:16:31 +0100 Subject: [PATCH 08/16] Fix formatting in tiered_test.go. --- storage/metric/tiered_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 370502e452..fbd9143b19 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -550,9 +550,9 @@ func TestGetFingerprintsForLabelSet(t *testing.T) { fpCount int }{ { - labels: model.LabelSet{}, + labels: model.LabelSet{}, fpCount: 0, - },{ + }, { labels: model.LabelSet{ model.MetricNameLabel: "http_requests", }, @@ -560,19 +560,19 @@ func TestGetFingerprintsForLabelSet(t *testing.T) { }, { labels: model.LabelSet{ model.MetricNameLabel: "http_requests", - "method": "/foo", + "method": "/foo", }, fpCount: 1, }, { labels: model.LabelSet{ model.MetricNameLabel: "http_requests", - "method": "/bar", + "method": "/bar", }, fpCount: 1, }, { labels: model.LabelSet{ model.MetricNameLabel: "http_requests", - "method": "/baz", + "method": "/baz", }, fpCount: 0, }, From 9d46b941ea17cd6add2cbc25a5ce4833f3f7de5f Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 28 Mar 2013 15:54:40 +0100 Subject: [PATCH 09/16] Include Travis status. I am going to regret this. --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index b29028f8e7..fe45014180 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,10 @@ For basic help how to get started: * [Getting started on Mac OSX](documentation/guides/getting-started-osx.md) * All of the core developers are accessible via the [Prometheus Developers Mailinglist](https://groups.google.com/forum/?fromgroups#!forum/prometheus-developers). +## Testing + +[![Build Status](https://travis-ci.org/prometheus/prometheus.png)](https://travis-ci.org/prometheus/prometheus) + ## License Apache License 2.0 From 676845afafbbca285502a2fb7d42313e07029742 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 28 Mar 2013 16:41:51 +0100 Subject: [PATCH 10/16] Implement sample interpolation in query layer. --- rules/ast/persistence_adapter.go | 75 ++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 7b136b32ca..2c430eaa72 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -27,33 +27,72 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal var queryStorage metric.Storage = nil type viewAdapter struct { - view metric.View - // TODO: use this. + view metric.View stalenessPolicy *metric.StalenessPolicy } -func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) { - var minDelta time.Duration - for _, candidate := range samples { - // Ignore samples outside of staleness policy window. - delta := candidate.Timestamp.Sub(*timestamp) - if delta < 0 { - delta = -delta - } - if delta > v.stalenessPolicy.DeltaAllowance { - continue - } +// interpolateSamples interpolates a value at a target time between two +// provided sample pairs. +func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *model.SamplePair { + dv := second.Value - first.Value + dt := second.Timestamp.Sub(first.Timestamp) - // Skip sample if we've seen a closer one before this. - if sample != nil { - if delta > minDelta { + dDt := dv / model.SampleValue(dt) + offset := model.SampleValue(timestamp.Sub(first.Timestamp)) + + return &model.SamplePair{ + Value: first.Value + (offset * dDt), + Timestamp: timestamp, + } +} + +// chooseClosestSample chooses the closest sample of a list of samples +// surrounding a given target time. If samples are found both before and after +// the target time, the sample value is interpolated between these. Otherwise, +// the single closest sample is returned verbatim. +func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) { + var closestBefore *model.SamplePair + var closestAfter *model.SamplePair + for _, candidate := range samples { + delta := candidate.Timestamp.Sub(*timestamp) + // Samples before target time. + if delta < 0 { + // Ignore samples outside of staleness policy window. + if -delta > v.stalenessPolicy.DeltaAllowance { continue } + // Ignore samples that are farther away than what we've seen before. + if closestBefore != nil && candidate.Timestamp.Before(closestBefore.Timestamp) { + continue + } + sample := candidate + closestBefore = &sample } - sample = &candidate - minDelta = delta + // Samples after target time. + if delta >= 0 { + // Ignore samples outside of staleness policy window. + if delta > v.stalenessPolicy.DeltaAllowance { + continue + } + // Ignore samples that are farther away than samples we've seen before. + if closestAfter != nil && candidate.Timestamp.After(closestAfter.Timestamp) { + continue + } + sample := candidate + closestAfter = &sample + } } + + switch { + case closestBefore != nil && closestAfter != nil: + sample = interpolateSamples(closestBefore, closestAfter, *timestamp) + case closestBefore != nil: + sample = closestBefore + default: + sample = closestAfter + } + return } From ec413459fa17d602735d648f158af8aa97b6251a Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 28 Mar 2013 17:05:06 +0100 Subject: [PATCH 11/16] Depointerize Matrix/Vector types as well as time.Time arguments. --- main.go | 2 +- rules/ast/ast.go | 52 ++++++++++++++++---------------- rules/ast/functions.go | 44 +++++++++++++-------------- rules/ast/persistence_adapter.go | 22 +++++++------- rules/ast/printer.go | 4 +-- rules/rules_test.go | 2 +- rules/testdata.go | 2 +- web/api/query.go | 2 +- 8 files changed, 65 insertions(+), 65 deletions(-) diff --git a/main.go b/main.go index 6ce8aadfa6..2f3cd78fa8 100644 --- a/main.go +++ b/main.go @@ -95,7 +95,7 @@ func main() { case ruleResult := <-ruleResults: for _, sample := range ruleResult.Samples { - ts.AppendSample(*sample) + ts.AppendSample(sample) } } } diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 3be9c49da7..bfc4d39f24 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -27,8 +27,8 @@ import ( // ---------------------------------------------------------------------------- // Raw data value types. -type Vector []*model.Sample -type Matrix []*model.SampleSet +type Vector model.Samples +type Matrix []model.SampleSet type groupedAggregation struct { labels model.Metric @@ -97,23 +97,23 @@ type Node interface { // interface represents the type returned to the parent node. type ScalarNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue + Eval(timestamp time.Time, view *viewAdapter) model.SampleValue } type VectorNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) Vector + Eval(timestamp time.Time, view *viewAdapter) Vector } type MatrixNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) Matrix - EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix + Eval(timestamp time.Time, view *viewAdapter) Matrix + EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix } type StringNode interface { Node - Eval(timestamp *time.Time, view *viewAdapter) string + Eval(timestamp time.Time, view *viewAdapter) string } // ---------------------------------------------------------------------------- @@ -227,17 +227,17 @@ func (node MatrixLiteral) Children() []Node { return []Node{} } func (node StringLiteral) Children() []Node { return []Node{} } func (node StringFunctionCall) Children() []Node { return node.args } -func (node *ScalarLiteral) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarLiteral) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { return node.value } -func (node *ScalarArithExpr) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarArithExpr) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { lhs := node.lhs.Eval(timestamp, view) rhs := node.rhs.Eval(timestamp, view) return evalScalarBinop(node.opType, lhs, rhs) } -func (node *ScalarFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { +func (node *ScalarFunctionCall) Eval(timestamp time.Time, view *viewAdapter) model.SampleValue { return node.function.callFn(timestamp, view, node.args).(model.SampleValue) } @@ -263,7 +263,7 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector) { if err != nil { return } - return node.Eval(×tamp, viewAdapter) + return node.Eval(timestamp, viewAdapter) } func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) { @@ -278,7 +278,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t // TODO implement watchdog timer for long-running queries. sampleSets := map[string]*model.SampleSet{} for t := start; t.Before(end); t = t.Add(interval) { - vector := node.Eval(&t, viewAdapter) + vector := node.Eval(t, viewAdapter) for _, sample := range vector { samplePair := model.SamplePair{ Value: sample.Value, @@ -297,7 +297,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t } for _, sampleSet := range sampleSets { - matrix = append(matrix, sampleSet) + matrix = append(matrix, *sampleSet) } return } @@ -312,23 +312,23 @@ func labelIntersection(metric1, metric2 model.Metric) model.Metric { return intersection } -func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp *time.Time) Vector { +func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp time.Time) Vector { vector := Vector{} for _, aggregation := range aggregations { if node.aggrType == AVG { aggregation.value = aggregation.value / model.SampleValue(aggregation.groupCount) } - sample := &model.Sample{ + sample := model.Sample{ Metric: aggregation.labels, Value: aggregation.value, - Timestamp: *timestamp, + Timestamp: timestamp, } vector = append(vector, sample) } return vector } -func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vector { vector := node.vector.Eval(timestamp, view) result := map[string]*groupedAggregation{} for _, sample := range vector { @@ -361,7 +361,7 @@ func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vec return node.groupedAggregationsToVector(result, timestamp) } -func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorLiteral) Eval(timestamp time.Time, view *viewAdapter) Vector { values, err := view.GetValueAtTime(node.fingerprints, timestamp) if err != nil { log.Printf("Unable to get vector values") @@ -370,7 +370,7 @@ func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector return values } -func (node *VectorFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorFunctionCall) Eval(timestamp time.Time, view *viewAdapter) Vector { return node.function.callFn(timestamp, view, node.args).(Vector) } @@ -514,7 +514,7 @@ func labelsEqual(labels1, labels2 model.Metric) bool { return true } -func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vector { +func (node *VectorArithExpr) Eval(timestamp time.Time, view *viewAdapter) Vector { lhs := node.lhs.Eval(timestamp, view) result := Vector{} if node.rhs.Type() == SCALAR { @@ -545,10 +545,10 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vecto panic("Invalid vector arithmetic expression operands") } -func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix { +func (node *MatrixLiteral) Eval(timestamp time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), - NewestInclusive: *timestamp, + NewestInclusive: timestamp, } values, err := view.GetRangeValues(node.fingerprints, interval) if err != nil { @@ -558,10 +558,10 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix return values } -func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix { +func (node *MatrixLiteral) EvalBoundaries(timestamp time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), - NewestInclusive: *timestamp, + NewestInclusive: timestamp, } values, err := view.GetBoundaryValues(node.fingerprints, interval) if err != nil { @@ -583,11 +583,11 @@ func (matrix Matrix) Swap(i, j int) { matrix[i], matrix[j] = matrix[j], matrix[i] } -func (node *StringLiteral) Eval(timestamp *time.Time, view *viewAdapter) string { +func (node *StringLiteral) Eval(timestamp time.Time, view *viewAdapter) string { return node.str } -func (node *StringFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) string { +func (node *StringFunctionCall) Eval(timestamp time.Time, view *viewAdapter) string { return node.function.callFn(timestamp, view, node.args).(string) } diff --git a/rules/ast/functions.go b/rules/ast/functions.go index 55356c9775..4cf5047c9d 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -24,7 +24,7 @@ type Function struct { name string argTypes []ExprType returnType ExprType - callFn func(timestamp *time.Time, view *viewAdapter, args []Node) interface{} + callFn func(timestamp time.Time, view *viewAdapter, args []Node) interface{} } func (function *Function) CheckArgTypes(args []Node) error { @@ -63,17 +63,17 @@ func (function *Function) CheckArgTypes(args []Node) error { } // === time() === -func timeImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func timeImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return model.SampleValue(time.Now().Unix()) } // === count(vector VectorNode) === -func countImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func countImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view))) } // === delta(matrix MatrixNode, isCounter ScalarNode) === -func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { matrixNode := args[0].(MatrixNode) isCounter := int(args[1].(ScalarNode).Eval(timestamp, view)) resultVector := Vector{} @@ -98,10 +98,10 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} lastValue = currentValue } resultValue := lastValue - samples.Values[0].Value + counterCorrection - resultSample := &model.Sample{ + resultSample := model.Sample{ Metric: samples.Metric, Value: resultValue, - Timestamp: *timestamp, + Timestamp: timestamp, } resultVector = append(resultVector, resultSample) } @@ -109,7 +109,7 @@ func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} } // === rate(node *MatrixNode) === -func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func rateImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { args = append(args, &ScalarLiteral{value: 1}) vector := deltaImpl(timestamp, view, args).(Vector) @@ -124,36 +124,36 @@ func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} } // === sampleVectorImpl() === -func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { +func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { return Vector{ - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "0", }, Value: 10, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "1", }, Value: 20, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", "instance": "2", }, Value: 30, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -161,9 +161,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "canary", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -171,9 +171,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "canary", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -181,9 +181,9 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "mytest", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, - &model.Sample{ + model.Sample{ Metric: model.Metric{ model.MetricNameLabel: "http_requests", "job": "api-server", @@ -191,7 +191,7 @@ func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) inte "group": "mytest", }, Value: 40, - Timestamp: *timestamp, + Timestamp: timestamp, }, } } diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 2c430eaa72..682f643e66 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -50,11 +50,11 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m // surrounding a given target time. If samples are found both before and after // the target time, the sample value is interpolated between these. Otherwise, // the single closest sample is returned verbatim. -func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) { +func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp time.Time) (sample *model.SamplePair) { var closestBefore *model.SamplePair var closestAfter *model.SamplePair for _, candidate := range samples { - delta := candidate.Timestamp.Sub(*timestamp) + delta := candidate.Timestamp.Sub(timestamp) // Samples before target time. if delta < 0 { // Ignore samples outside of staleness policy window. @@ -86,7 +86,7 @@ func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp switch { case closestBefore != nil && closestAfter != nil: - sample = interpolateSamples(closestBefore, closestAfter, *timestamp) + sample = interpolateSamples(closestBefore, closestAfter, timestamp) case closestBefore != nil: sample = closestBefore default: @@ -96,26 +96,26 @@ func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp return } -func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp *time.Time) (samples []*model.Sample, err error) { +func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) { for _, fingerprint := range fingerprints { - sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp) + sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp) m, err := queryStorage.GetMetricForFingerprint(fingerprint) if err != nil { continue } if samplePair != nil { - samples = append(samples, &model.Sample{ + samples = append(samples, model.Sample{ Metric: *m, Value: samplePair.Value, - Timestamp: *timestamp, + Timestamp: timestamp, }) } } return } -func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { +func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { for _, fingerprint := range fingerprints { // TODO: change to GetBoundaryValues() once it has the right return type. samplePairs := v.view.GetRangeValues(fingerprint, *interval) @@ -129,7 +129,7 @@ func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interva continue } - sampleSet := &model.SampleSet{ + sampleSet := model.SampleSet{ Metric: *m, Values: samplePairs, } @@ -138,7 +138,7 @@ func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interva return sampleSets, nil } -func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { +func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { for _, fingerprint := range fingerprints { samplePairs := v.view.GetRangeValues(fingerprint, *interval) if samplePairs == nil { @@ -151,7 +151,7 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval * continue } - sampleSet := &model.SampleSet{ + sampleSet := model.SampleSet{ Metric: *m, Values: samplePairs, } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index 17062bda2c..2864361dce 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -152,8 +152,8 @@ func TypedValueToJSON(data interface{}, typeStr string) string { return string(dataJSON) } -func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { - viewAdapter, err := viewAdapterForInstantQuery(node, *timestamp) +func EvalToString(node Node, timestamp time.Time, format OutputFormat) string { + viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) if err != nil { panic(err) } diff --git a/rules/rules_test.go b/rules/rules_test.go index 43e8ae5540..bda1cb1963 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -241,7 +241,7 @@ func TestExpressions(t *testing.T) { t.Errorf("Test should fail, but didn't") } failed := false - resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT) + resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT) resultLines := strings.Split(resultStr, "\n") if len(exprTest.output) != len(resultLines) { diff --git a/rules/testdata.go b/rules/testdata.go index 7e853f5946..deeb0a54dc 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -42,7 +42,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { vector := ast.Vector{} for _, sampleSet := range matrix { lastSample := sampleSet.Values[len(sampleSet.Values)-1] - vector = append(vector, &model.Sample{ + vector = append(vector, model.Sample{ Metric: sampleSet.Metric, Value: lastSample.Value, Timestamp: lastSample.Timestamp, diff --git a/web/api/query.go b/web/api/query.go index 3021c1a780..5b39e7a241 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -44,7 +44,7 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string) rb.SetContentType(gorest.Text_Plain) } - return ast.EvalToString(exprNode, ×tamp, format) + return ast.EvalToString(exprNode, timestamp, format) } func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string { From 8b91bffa1da0db2e463dc0136f49fd1c3c25908e Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 28 Mar 2013 18:17:50 +0100 Subject: [PATCH 12/16] Make Travis test runs verbose. --- Makefile | 2 +- Makefile.TRAVIS | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e634862405..376d52e871 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ TEST_ARTIFACTS = prometheus prometheus.build search_index all: test test: build - go test ./... + go test ./... $(GO_TEST_FLAGS) build: $(MAKE) -C model diff --git a/Makefile.TRAVIS b/Makefile.TRAVIS index 74402dcdde..c802995186 100644 --- a/Makefile.TRAVIS +++ b/Makefile.TRAVIS @@ -28,6 +28,8 @@ export LDFLAGS := $(LDFLAGS) -L$(OVERLAY_ROOT)/lib export CGO_CFLAGS := $(CFLAGS) -lsnappy export CGO_LDFLAGS := $(LDFLAGS) +export GO_TEST_FLAGS := "-v" + GO_GET := go get -u -v -x APT_GET_INSTALL := sudo apt-get install -y WGET := wget -c From e31591e6fe62a7a95f479b015c0b13c1ae4d98d9 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 28 Mar 2013 18:14:41 +0100 Subject: [PATCH 13/16] Allow single-letter identifiers (metric and label names). --- rules/lexer.l | 2 +- rules/lexer.l.go | 2 +- rules/rules_test.go | 7 +++++++ rules/testdata.go | 8 ++++++++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/rules/lexer.l b/rules/lexer.l index 643f52fc6d..1edfcc6b09 100644 --- a/rules/lexer.l +++ b/rules/lexer.l @@ -45,7 +45,7 @@ AVG|SUM|MAX|MIN { yylval.str = yytext; return AGGR_OP } [*/%] { yylval.str = yytext; return MULT_OP } {D}+{U} { yylval.str = yytext; return DURATION } -{L}({L}|{D})+ { yylval.str = yytext; return IDENTIFIER } +{L}({L}|{D})* { yylval.str = yytext; return IDENTIFIER } \-?{D}+(\.{D}*)? { num, err := strconv.ParseFloat(yytext, 32); if (err != nil && err.(*strconv.NumError).Err == strconv.ErrSyntax) { diff --git a/rules/lexer.l.go b/rules/lexer.l.go index 7ec712fd63..0671c45afb 100644 --- a/rules/lexer.l.go +++ b/rules/lexer.l.go @@ -411,7 +411,7 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon return yyactionreturn{DURATION, yyRT_USER_RETURN} } return yyactionreturn{0, yyRT_FALLTHROUGH} -}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))+"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { +}}, {regexp.MustCompile("([a-zA-Z_:])(([a-zA-Z_:])|([0-9]))*"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { defer func() { if r := recover(); r != nil { if r != "yyREJECT" { diff --git a/rules/rules_test.go b/rules/rules_test.go index bda1cb1963..1b928ffd92 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -179,6 +179,13 @@ var expressionTests = []struct { }, fullRanges: 8, intervalRanges: 0, + }, { + expr: "x{y='testvalue'}", + output: []string{ + "x{y='testvalue'} => 100 @[%v]", + }, + fullRanges: 0, + intervalRanges: 1, // Invalid expressions that should fail to parse. }, { expr: "", diff --git a/rules/testdata.go b/rules/testdata.go index deeb0a54dc..7a900bab05 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -140,6 +140,14 @@ var testMatrix = ast.Matrix{ }, Values: getTestValueStream(0, 800, 80), }, + // Single-letter metric and label names. + { + Metric: model.Metric{ + model.MetricNameLabel: "x", + "y": "testvalue", + }, + Values: getTestValueStream(0, 100, 10), + }, } var testVector = getTestVectorFromTestMatrix(testMatrix) From 7764682c7cf3affc85bb3e8c372720336518735a Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 2 Apr 2013 15:20:30 +0200 Subject: [PATCH 14/16] Update to use new default handler. --- web/web.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/web/web.go b/web/web.go index e3672a707e..f754a5b96b 100644 --- a/web/web.go +++ b/web/web.go @@ -32,11 +32,10 @@ var ( func StartServing(appState *appstate.ApplicationState) { gorest.RegisterService(api.NewMetricsService(appState)) - exporter := registry.DefaultRegistry.YieldExporter() http.Handle("/status", &StatusHandler{appState: appState}) http.Handle("/api/", gorest.Handle()) - http.Handle("/metrics.json", exporter) + http.Handle("/metrics.json", registry.DefaultHandler) if *useLocalAssets { http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static")))) } else { From d0ad6cbeaabfbdc28cdc504f9a276d3a5d628453 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 3 Apr 2013 12:09:05 +0200 Subject: [PATCH 15/16] Spin up storage layers for made fixtures. --- storage/metric/curator_test.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 3cce655582..7757ae4f53 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw/leveldb" fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" @@ -478,14 +479,32 @@ func TestCurator(t *testing.T) { ) for _, scenario := range scenarios { - var ( - curatorDirectory = fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) - watermarkDirectory = fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) - sampleDirectory = fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) - ) + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) defer curatorDirectory.Close() + + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) defer watermarkDirectory.Close() + + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) defer sampleDirectory.Close() + curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer curatorState.Close() + + watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer watermarkState.Close() + + samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer samples.Close() + } } From c3e3460ca6e23ef8f9041150591c51b883fcbacd Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 5 Apr 2013 13:07:13 +0200 Subject: [PATCH 16/16] Spin up curator run in the tests. After this commit, we'll need to add validations that it does the desired work, which we presently know that it doesn't. Given the changes I made with a plethora of renamings, I want to commit this now before it gets even larger. --- coding/protocol_buffer.go | 13 +++-- storage/metric/.gitignore | 1 + storage/metric/curator.go | 63 ++++++++++++---------- storage/metric/curator_test.go | 79 ++++++++++++++++------------ storage/metric/frontier.go | 4 +- storage/metric/leveldb.go | 32 +++++------ storage/metric/tiered.go | 2 +- storage/raw/index/leveldb/leveldb.go | 2 +- storage/raw/leveldb/test/fixtures.go | 16 +----- 9 files changed, 112 insertions(+), 100 deletions(-) create mode 100644 storage/metric/.gitignore diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 273395207e..19c8bef150 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -15,13 +15,14 @@ package coding import ( "code.google.com/p/goprotobuf/proto" + "fmt" ) -type ProtocolBufferEncoder struct { +type ProtocolBuffer struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { +func (p ProtocolBuffer) Encode() (raw []byte, err error) { raw, err = proto.Marshal(p.message) // XXX: Adjust legacy users of this to not check for error. @@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { return } -func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { - return &ProtocolBufferEncoder{ +func (p ProtocolBuffer) String() string { + return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +} + +func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { + return &ProtocolBuffer{ message: message, } } diff --git a/storage/metric/.gitignore b/storage/metric/.gitignore new file mode 100644 index 0000000000..3460f0346d --- /dev/null +++ b/storage/metric/.gitignore @@ -0,0 +1 @@ +command-line-arguments.test diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 62918dc430..592f4ae876 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -37,8 +37,9 @@ type curator struct { // watermarks is the on-disk store that is scanned for high watermarks for // given metrics. watermarks raw.Persistence - // cutOff represents the most recent time up to which values will be curated. - cutOff time.Time + // recencyThreshold represents the most recent time up to which values will be + // curated. + recencyThreshold time.Duration // groupingQuantity represents the number of samples below which encountered // samples will be dismembered and reaggregated into larger groups. groupingQuantity uint32 @@ -48,9 +49,9 @@ type curator struct { } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { return curator{ - cutOff: cutOff, + recencyThreshold: recencyThreshold, stop: make(chan bool), samples: samples, curationState: curationState, @@ -60,19 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample } // run facilitates the curation lifecycle. -func (c curator) run() (err error) { - var ( - decoder watermarkDecoder - filter = watermarkFilter{ - stop: c.stop, - curationState: c.curationState, - } - operator = watermarkOperator{ - olderThan: c.cutOff, - groupSize: c.groupingQuantity, - curationState: c.curationState, - } - ) +func (c curator) run(instant time.Time) (err error) { + decoder := watermarkDecoder{} + filter := watermarkFilter{ + stop: c.stop, + curationState: c.curationState, + groupSize: c.groupingQuantity, + recencyThreshold: c.recencyThreshold, + } + operator := watermarkOperator{ + olderThan: instant.Add(-1 * c.recencyThreshold), + groupSize: c.groupingQuantity, + curationState: c.curationState, + } _, err = c.watermarks.ForEach(decoder, filter, operator) @@ -126,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that remark on + // curationState is the table of CurationKey to CurationValues that rema // far along the curation process has gone for a given metric fingerprint. curationState raw.Persistence // stop, when non-empty, instructs the filter to stop operation. stop chan bool + // groupSize refers to the target groupSize from the curator. + groupSize uint32 + // recencyThreshold refers to the target recencyThreshold from the curator. + recencyThreshold time.Duration } func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - curationKey = fingerprint.ToDTO() - rawCurationValue []byte - err error - curationValue = &dto.CurationValue{} - ) + fingerprint := key.(model.Fingerprint) + watermark := value.(model.Watermark) + curationKey := &dto.CurationKey{ + Fingerprint: fingerprint.ToDTO(), + MinimumGroupSize: proto.Uint32(w.groupSize), + OlderThan: proto.Int64(int64(w.recencyThreshold)), + } + curationValue := &dto.CurationValue{} - rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { panic(err) } @@ -229,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er MinimumGroupSize: proto.Uint32(w.groupSize), } - curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) return } @@ -247,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod } ) - rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { return } diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 7757ae4f53..c950ffb42f 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -27,10 +27,10 @@ import ( type ( curationState struct { - fingerprint string - groupSize int - olderThan time.Duration - lastCurated time.Time + fingerprint string + groupSize int + recencyThreshold time.Duration + lastCurated time.Time } watermarkState struct { @@ -48,21 +48,23 @@ type ( values []sample } - context struct { - curationStates fixture.Pairs - watermarkStates fixture.Pairs - sampleGroups fixture.Pairs + in struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + recencyThreshold time.Duration + groupSize uint32 } ) func (c curationState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + key = coding.NewProtocolBuffer(&dto.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), - OlderThan: proto.Int64(int64(c.olderThan)), + OlderThan: proto.Int64(int64(c.recencyThreshold)), }) - value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + value = coding.NewProtocolBuffer(&dto.CurationValue{ LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), }) @@ -70,13 +72,13 @@ func (c curationState) Get() (key, value coding.Encoder) { } func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) return } func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + key = coding.NewProtocolBuffer(&dto.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), Timestamp: indexable.EncodeTime(s.values[0].time), LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), @@ -92,7 +94,7 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { }) } - value = coding.NewProtocolBufferEncoder(series) + value = coding.NewProtocolBuffer(series) return } @@ -100,22 +102,31 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { func TestCurator(t *testing.T) { var ( scenarios = []struct { - context context + in in }{ { - context: context{ + in: in{ + recencyThreshold: 1 * time.Hour, + groupSize: 5, curationStates: fixture.Pairs{ curationState{ - fingerprint: "0001-A-1-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 30 * time.Minute), + fingerprint: "0001-A-1-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), }, curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), + fingerprint: "0002-A-2-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 2, + recencyThreshold: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), }, }, watermarkStates: fixture.Pairs{ @@ -124,7 +135,7 @@ func TestCurator(t *testing.T) { lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, watermarkState{ - fingerprint: "0002-A-1-Z", + fingerprint: "0002-A-2-Z", lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, }, @@ -479,26 +490,26 @@ func TestCurator(t *testing.T) { ) for _, scenario := range scenarios { - curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) defer curatorDirectory.Close() - watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) defer watermarkDirectory.Close() - sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer curatorState.Close() + defer curatorStates.Close() - watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer watermarkState.Close() + defer watermarkStates.Close() samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) if err != nil { @@ -506,5 +517,7 @@ func TestCurator(t *testing.T) { } defer samples.Close() + c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) + c.run(testInstant) } } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9c4183902d..ecc5c93215 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw, err := coding.NewProtocolBufferEncoder(key).Encode() + raw, err := coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } @@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw, err = coding.NewProtocolBufferEncoder(key).Encode() + raw, err = coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 654ae978b9..d1d59f98d1 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(fingerprint.ToDTO()) + value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value = &dto.MetricHighWatermark{} raw []byte newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBufferEncoder(key) + keyEncoded = coding.NewProtocolBuffer(key) ) key.Signature = proto.String(fingerprint.ToRowKey()) @@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) mutationCount++ } @@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) if err != nil { return fps, err } @@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { return } @@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T Timestamp: indexable.EncodeTime(t), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } @@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c8dc40d1e3..daad234a84 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index e00152d3da..a877a6d87a 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -21,7 +21,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 0499b8daca..2cb8d35d9c 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -15,7 +15,6 @@ package test import ( "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -64,13 +63,7 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - - var ( - persistence raw.Persistence - err error - ) - - persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) if err != nil { defer t.Close() p.tester.Fatal(err) @@ -83,12 +76,7 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory }() for f.HasNext() { - var ( - key coding.Encoder - value coding.Encoder - ) - - key, value = f.Next() + key, value := f.Next() err = persistence.Put(key, value) if err != nil {