diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 273395207..19c8bef15 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -15,13 +15,14 @@ package coding import ( "code.google.com/p/goprotobuf/proto" + "fmt" ) -type ProtocolBufferEncoder struct { +type ProtocolBuffer struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { +func (p ProtocolBuffer) Encode() (raw []byte, err error) { raw, err = proto.Marshal(p.message) // XXX: Adjust legacy users of this to not check for error. @@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { return } -func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { - return &ProtocolBufferEncoder{ +func (p ProtocolBuffer) String() string { + return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +} + +func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { + return &ProtocolBuffer{ message: message, } } diff --git a/storage/metric/.gitignore b/storage/metric/.gitignore new file mode 100644 index 000000000..3460f0346 --- /dev/null +++ b/storage/metric/.gitignore @@ -0,0 +1 @@ +command-line-arguments.test diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 62918dc43..592f4ae87 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -37,8 +37,9 @@ type curator struct { // watermarks is the on-disk store that is scanned for high watermarks for // given metrics. watermarks raw.Persistence - // cutOff represents the most recent time up to which values will be curated. - cutOff time.Time + // recencyThreshold represents the most recent time up to which values will be + // curated. + recencyThreshold time.Duration // groupingQuantity represents the number of samples below which encountered // samples will be dismembered and reaggregated into larger groups. groupingQuantity uint32 @@ -48,9 +49,9 @@ type curator struct { } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { return curator{ - cutOff: cutOff, + recencyThreshold: recencyThreshold, stop: make(chan bool), samples: samples, curationState: curationState, @@ -60,19 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample } // run facilitates the curation lifecycle. -func (c curator) run() (err error) { - var ( - decoder watermarkDecoder - filter = watermarkFilter{ - stop: c.stop, - curationState: c.curationState, - } - operator = watermarkOperator{ - olderThan: c.cutOff, - groupSize: c.groupingQuantity, - curationState: c.curationState, - } - ) +func (c curator) run(instant time.Time) (err error) { + decoder := watermarkDecoder{} + filter := watermarkFilter{ + stop: c.stop, + curationState: c.curationState, + groupSize: c.groupingQuantity, + recencyThreshold: c.recencyThreshold, + } + operator := watermarkOperator{ + olderThan: instant.Add(-1 * c.recencyThreshold), + groupSize: c.groupingQuantity, + curationState: c.curationState, + } _, err = c.watermarks.ForEach(decoder, filter, operator) @@ -126,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that remark on + // curationState is the table of CurationKey to CurationValues that rema // far along the curation process has gone for a given metric fingerprint. curationState raw.Persistence // stop, when non-empty, instructs the filter to stop operation. stop chan bool + // groupSize refers to the target groupSize from the curator. + groupSize uint32 + // recencyThreshold refers to the target recencyThreshold from the curator. + recencyThreshold time.Duration } func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - curationKey = fingerprint.ToDTO() - rawCurationValue []byte - err error - curationValue = &dto.CurationValue{} - ) + fingerprint := key.(model.Fingerprint) + watermark := value.(model.Watermark) + curationKey := &dto.CurationKey{ + Fingerprint: fingerprint.ToDTO(), + MinimumGroupSize: proto.Uint32(w.groupSize), + OlderThan: proto.Int64(int64(w.recencyThreshold)), + } + curationValue := &dto.CurationValue{} - rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { panic(err) } @@ -229,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er MinimumGroupSize: proto.Uint32(w.groupSize), } - curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) return } @@ -247,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod } ) - rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) if err != nil { return } diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go index 7757ae4f5..c950ffb42 100644 --- a/storage/metric/curator_test.go +++ b/storage/metric/curator_test.go @@ -27,10 +27,10 @@ import ( type ( curationState struct { - fingerprint string - groupSize int - olderThan time.Duration - lastCurated time.Time + fingerprint string + groupSize int + recencyThreshold time.Duration + lastCurated time.Time } watermarkState struct { @@ -48,21 +48,23 @@ type ( values []sample } - context struct { - curationStates fixture.Pairs - watermarkStates fixture.Pairs - sampleGroups fixture.Pairs + in struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + recencyThreshold time.Duration + groupSize uint32 } ) func (c curationState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.CurationKey{ + key = coding.NewProtocolBuffer(&dto.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), - OlderThan: proto.Int64(int64(c.olderThan)), + OlderThan: proto.Int64(int64(c.recencyThreshold)), }) - value = coding.NewProtocolBufferEncoder(&dto.CurationValue{ + value = coding.NewProtocolBuffer(&dto.CurationValue{ LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), }) @@ -70,13 +72,13 @@ func (c curationState) Get() (key, value coding.Encoder) { } func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) return } func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBufferEncoder(&dto.SampleKey{ + key = coding.NewProtocolBuffer(&dto.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), Timestamp: indexable.EncodeTime(s.values[0].time), LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), @@ -92,7 +94,7 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { }) } - value = coding.NewProtocolBufferEncoder(series) + value = coding.NewProtocolBuffer(series) return } @@ -100,22 +102,31 @@ func (s sampleGroup) Get() (key, value coding.Encoder) { func TestCurator(t *testing.T) { var ( scenarios = []struct { - context context + in in }{ { - context: context{ + in: in{ + recencyThreshold: 1 * time.Hour, + groupSize: 5, curationStates: fixture.Pairs{ curationState{ - fingerprint: "0001-A-1-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 30 * time.Minute), + fingerprint: "0001-A-1-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), }, curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 5, - olderThan: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), + fingerprint: "0002-A-2-Z", + groupSize: 5, + recencyThreshold: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + groupSize: 2, + recencyThreshold: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), }, }, watermarkStates: fixture.Pairs{ @@ -124,7 +135,7 @@ func TestCurator(t *testing.T) { lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, watermarkState{ - fingerprint: "0002-A-1-Z", + fingerprint: "0002-A-2-Z", lastAppended: testInstant.Add(-1 * 15 * time.Minute), }, }, @@ -479,26 +490,26 @@ func TestCurator(t *testing.T) { ) for _, scenario := range scenarios { - curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates)) + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) defer curatorDirectory.Close() - watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates)) + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) defer watermarkDirectory.Close() - sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups)) + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer curatorState.Close() + defer curatorStates.Close() - watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) if err != nil { t.Fatal(err) } - defer watermarkState.Close() + defer watermarkStates.Close() samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) if err != nil { @@ -506,5 +517,7 @@ func TestCurator(t *testing.T) { } defer samples.Close() + c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) + c.run(testInstant) } } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9c4183902..ecc5c9321 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw, err := coding.NewProtocolBufferEncoder(key).Encode() + raw, err := coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } @@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw, err = coding.NewProtocolBufferEncoder(key).Encode() + raw, err = coding.NewProtocolBuffer(key).Encode() if err != nil { panic(err) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 654ae978b..d1d59f98d 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(fingerprint.ToDTO()) + value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value = &dto.MetricHighWatermark{} raw []byte newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBufferEncoder(key) + keyEncoded = coding.NewProtocolBuffer(key) ) key.Signature = proto.String(fingerprint.ToRowKey()) @@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) mutationCount++ } @@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) } } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() - dtoKey := coding.NewProtocolBufferEncoder(dto) + dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) if err != nil { return fps, err } @@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { return } @@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T Timestamp: indexable.EncodeTime(t), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } @@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model. Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e, err := coding.NewProtocolBufferEncoder(k).Encode() + e, err := coding.NewProtocolBuffer(k).Encode() if err != nil { return } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index eabcc99bd..81d0fee67 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index e00152d3d..a877a6d87 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -21,7 +21,7 @@ import ( ) var ( - existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{}) + existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) ) type LevelDBMembershipIndex struct { diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index 0499b8dac..2cb8d35d9 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -15,7 +15,6 @@ package test import ( "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility/test" ) @@ -64,13 +63,7 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - - var ( - persistence raw.Persistence - err error - ) - - persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) if err != nil { defer t.Close() p.tester.Fatal(err) @@ -83,12 +76,7 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory }() for f.HasNext() { - var ( - key coding.Encoder - value coding.Encoder - ) - - key, value = f.Next() + key, value := f.Next() err = persistence.Put(key, value) if err != nil {