diff --git a/Makefile b/Makefile index 9b9582be7..e63486240 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,8 @@ build: $(MAKE) -C model $(MAKE) -C web go build ./... + +binary: build go build -o prometheus.build clean: @@ -44,4 +46,4 @@ search_index: documentation: search_index godoc -http=:6060 -index -index_files='search_index' -.PHONY: advice build clean documentation format search_index test +.PHONY: advice binary build clean documentation format search_index test diff --git a/model/fingerprinting.go b/model/fingerprinting.go index 6dd1257f9..efdd64fa8 100644 --- a/model/fingerprinting.go +++ b/model/fingerprinting.go @@ -120,6 +120,10 @@ type fingerprint struct { lastCharacterOfLastLabelValue string } +func (f fingerprint) String() string { + return f.ToRowKey() +} + func (f fingerprint) ToRowKey() string { return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter) } diff --git a/model/watermark.go b/model/watermark.go index 93cf47c91..e6126bd41 100644 --- a/model/watermark.go +++ b/model/watermark.go @@ -14,6 +14,7 @@ package model import ( + "code.google.com/p/goprotobuf/proto" dto "github.com/prometheus/prometheus/model/generated" "time" ) @@ -24,6 +25,14 @@ type Watermark struct { time.Time } +// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given +// Watermark. +func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark { + return &dto.MetricHighWatermark{ + Timestamp: proto.Int64(w.Time.Unix()), + } +} + // NewWatermarkFromHighWatermarkDTO builds Watermark from the provided // dto.MetricHighWatermark object. func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { @@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { time.Unix(*d.Timestamp, 0), } } + +// NewWatermarkFromTime builds a new Watermark for the provided time. +func NewWatermarkFromTime(t time.Time) Watermark { + return Watermark{ + t, + } +} diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 95f82c42c..3be9c49da 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -177,10 +177,10 @@ type ( type ( // Matrix literal, i.e. metric name plus labelset and timerange. MatrixLiteral struct { - labels model.LabelSet + labels model.LabelSet // Fingerprints are populated from labels at query analysis time. fingerprints model.Fingerprints - interval time.Duration + interval time.Duration } ) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index fb3d64646..62918dc43 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -64,7 +64,8 @@ func (c curator) run() (err error) { var ( decoder watermarkDecoder filter = watermarkFilter{ - stop: c.stop, + stop: c.stop, + curationState: c.curationState, } operator = watermarkOperator{ olderThan: c.cutOff, diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 5adb75c2e..3cce65558 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -14,184 +14,464 @@ package metric import ( + "code.google.com/p/goprotobuf/proto" "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/raw" - "sort" + dto "github.com/prometheus/prometheus/model/generated" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" "testing" "time" ) type ( - keyPair struct { - fingerprint model.Fingerprint - time time.Time + curationState struct { + fingerprint string + groupSize int + olderThan time.Duration + lastCurated time.Time } - fakeCurationStates map[model.Fingerprint]time.Time - fakeSamples map[keyPair][]float32 - fakeWatermarks map[model.Fingerprint]time.Time - - in struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks - cutOff time.Time - grouping uint32 + watermarkState struct { + fingerprint string + lastAppended time.Time } - out struct { - curationStates fakeCurationStates - samples fakeSamples - watermarks fakeWatermarks + sample struct { + time time.Time + value float32 + } + + sampleGroup struct { + fingerprint string + values []sample + } + + context struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs } ) -func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (c curationState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), + MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), + OlderThan: proto.Int64(int64(c.olderThan)), + }) -func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeCurationStates) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeCurationStates) Close() error { - panic("unimplemented") -} - -func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - var ( - fingerprints model.Fingerprints - ) - - for f := range c { - fingerprints = append(fingerprints, f) - } - - sort.Sort(fingerprints) - - for _, k := range fingerprints { - v := c[k] - - var ( - decodedKey interface{} - decodedValue interface{} - ) - - decodedKey, err = d.DecodeKey(k) - if err != nil { - continue - } - - decodedValue, err = d.DecodeValue(v) - if err != nil { - continue - } - - switch f.Filter(decodedKey, decodedValue) { - case storage.STOP: - return - case storage.SKIP: - continue - case storage.ACCEPT: - opErr := o.Operate(decodedKey, decodedValue) - if opErr != nil { - if opErr.Continuable { - continue - } - break - } - } - } + value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), + }) return } -func (c fakeCurationStates) Commit(_ raw.Batch) error { - panic("unimplemented") +func (w watermarkState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + return } -func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} +func (s sampleGroup) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), + Timestamp: indexable.EncodeTime(s.values[0].time), + LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), + SampleCount: proto.Uint32(uint32(len(s.values))), + }) -func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} + series := &dto.SampleValueSeries{} -func (c fakeSamples) Drop(_ coding.Encoder) error { - panic("unimplemented") -} + for _, value := range s.values { + series.Value = append(series.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(value.time.Unix()), + Value: proto.Float32(float32(value.value)), + }) + } -func (c fakeSamples) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} + value = coding.NewProtocolBufferEncoder(series) -func (c fakeSamples) Close() error { - panic("unimplemented") -} - -func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeSamples) Commit(_ raw.Batch) (err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Drop(_ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Put(_, _ coding.Encoder) error { - panic("unimplemented") -} - -func (c fakeWatermarks) Close() error { - panic("unimplemented") -} - -func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { - panic("unimplemented") -} - -func (c fakeWatermarks) Commit(_ raw.Batch) (err error) { - panic("unimplemented") + return } func TestCurator(t *testing.T) { var ( scenarios = []struct { - in in - out out + context context }{ { - in: in{ - curationStates: fakeCurationStates{ - model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), - model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), + context: context{ + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + }, + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 5, + olderThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 55 * time.Minute), + value: 2, + }, + { + time: testInstant.Add(-1 * 50 * time.Minute), + value: 3, + }, + { + time: testInstant.Add(-1 * 45 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 40 * time.Minute), + value: 0, + }, + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 25 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 35 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 30 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 90 * time.Minute), + value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 89 * time.Minute), + value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 88 * time.Minute), + value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 87 * time.Minute), + value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 86 * time.Minute), + value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 85 * time.Minute), + value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 84 * time.Minute), + value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 83 * time.Minute), + value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 82 * time.Minute), + value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 81 * time.Minute), + value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 80 * time.Minute), + value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 79 * time.Minute), + value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 78 * time.Minute), + value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 77 * time.Minute), + value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 76 * time.Minute), + value: 14, + }, + { + time: testInstant.Add(-1 * 75 * time.Minute), + value: 15, + }, + { + time: testInstant.Add(-1 * 74 * time.Minute), + value: 16, + }, + { + time: testInstant.Add(-1 * 73 * time.Minute), + value: 17, + }, + { + time: testInstant.Add(-1 * 72 * time.Minute), + value: 18, + }, + { + time: testInstant.Add(-1 * 71 * time.Minute), + value: 19, + }, + { + time: testInstant.Add(-1 * 70 * time.Minute), + value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 69 * time.Minute), + value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 68 * time.Minute), + value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 67 * time.Minute), + value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 66 * time.Minute), + value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 65 * time.Minute), + value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 64 * time.Minute), + value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 63 * time.Minute), + value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 62 * time.Minute), + value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 61 * time.Minute), + value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: []sample{ + { + time: testInstant.Add(-1 * 60 * time.Minute), + value: 30, + }, + }, + }, }, - watermarks: fakeWatermarks{}, - samples: fakeSamples{}, - cutOff: testInstant.Add(5 * time.Minute), - grouping: 5, }, }, } @@ -199,15 +479,13 @@ func TestCurator(t *testing.T) { for _, scenario := range scenarios { var ( - in = scenario.in - - curationStates = in.curationStates - samples = in.samples - watermarks = in.watermarks - cutOff = in.cutOff - grouping = in.grouping + curatorDirectory = fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + watermarkDirectory = fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + sampleDirectory = fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) ) + defer curatorDirectory.Close() + defer watermarkDirectory.Close() + defer sampleDirectory.Close() - _ = newCurator(cutOff, grouping, curationStates, samples, watermarks) } } diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 42979a6a4..cb3381fb0 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -63,6 +63,8 @@ var ( ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, } + curationDuration = metrics.NewCounter() + curationDurations = metrics.NewHistogram(diskLatencyHistogram) storageOperations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter() storageLatency = metrics.NewHistogram(diskLatencyHistogram) diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 959f3f784..c4c08a668 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -21,7 +21,9 @@ import ( ) var ( - testInstant = time.Time{} + // ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20`` + usEastern, _ = time.LoadLocation("US/Eastern") + testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern) ) func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index ed01cab08..1eccb772b 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -14,7 +14,6 @@ package metric import ( - "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" "sort" @@ -380,9 +379,7 @@ func testMakeView(t test.Tester) { } } - start := time.Now() tiered.Flush() - fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data)) requestBuilder := NewViewRequestBuilder() diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 81346f938..0499b8dac 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -75,6 +75,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory defer t.Close() p.tester.Fatal(err) } + defer func() { + err = persistence.Close() + if err != nil { + p.tester.Fatal(err) + } + }() for f.HasNext() { var (