Wrap dto.SampleKey with business logic type.

The curator work can be done easier if dto.SampleKey is no longer
directly accessed but rather has a higher level type around it that
captures a certain modicum of business logic.  This doesn't look
terribly interesting today, but it will get more so.
This commit is contained in:
Matt T. Proud 2013-04-21 19:16:15 +02:00
parent 6ed97fbe8a
commit db4ffbb262
5 changed files with 109 additions and 63 deletions

79
model/samplekey.go Normal file
View file

@ -0,0 +1,79 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding/indexable"
dto "github.com/prometheus/prometheus/model/generated"
"time"
)
// SampleKey models the business logic around the data-transfer object
// SampleKey.
type SampleKey struct {
Fingerprint Fingerprint
FirstTimestamp time.Time
LastTimestamp time.Time
SampleCount uint32
}
// MayContain indicates whether the given SampleKey could potentially contain a
// value at the provided time. Even if true is emitted, that does not mean a
// satisfactory value, in fact, exists.
func (s SampleKey) MayContain(t time.Time) (could bool) {
switch {
case t.Before(s.FirstTimestamp):
return
case t.After(s.LastTimestamp):
return
}
return true
}
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
func (s SampleKey) ToDTO() (out *dto.SampleKey) {
out = &dto.SampleKey{
Fingerprint: s.Fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(s.FirstTimestamp),
LastTimestamp: proto.Int64(s.LastTimestamp.Unix()),
SampleCount: proto.Uint32(s.SampleCount),
}
return
}
// ToPartialDTO converts this SampleKey into a DTO that is only suitable for
// database exploration purposes for a given (Fingerprint, First Sample Time)
// tuple.
func (s SampleKey) ToPartialDTO(out *dto.SampleKey) {
out = &dto.SampleKey{
Fingerprint: s.Fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(s.FirstTimestamp),
}
return
}
// NewSampleKeyFromDTO builds a new SampleKey from a provided data-transfer
// object.
func NewSampleKeyFromDTO(dto *dto.SampleKey) SampleKey {
return SampleKey{
Fingerprint: NewFingerprintFromDTO(dto.Fingerprint),
FirstTimestamp: indexable.DecodeTime(dto.Timestamp),
LastTimestamp: time.Unix(*dto.LastTimestamp, 0),
SampleCount: *dto.SampleCount,
}
}

View file

@ -67,10 +67,10 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
d = &diskFrontier{}
d.firstFingerprint = model.NewFingerprintFromRowKey(*firstKey.Fingerprint.Signature)
d.firstSupertime = indexable.DecodeTime(firstKey.Timestamp)
d.lastFingerprint = model.NewFingerprintFromRowKey(*lastKey.Fingerprint.Signature)
d.lastSupertime = indexable.DecodeTime(lastKey.Timestamp)
d.firstFingerprint = firstKey.Fingerprint
d.firstSupertime = firstKey.FirstTimestamp
d.lastFingerprint = lastKey.Fingerprint
d.lastSupertime = lastKey.FirstTimestamp
return
}
@ -111,6 +111,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
upperSeek = indexable.EncodeTime(d.lastSupertime)
}
// TODO: Convert this to SampleKey.ToPartialDTO.
key := &dto.SampleKey{
Fingerprint: f.ToDTO(),
Timestamp: upperSeek,
@ -131,7 +132,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
panic(err)
}
retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)
retrievedFingerprint := retrievedKey.Fingerprint
// The returned fingerprint may not match if the original seek key lives
// outside of a metric's frontier. This is probable, for we are seeking to
@ -146,7 +147,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
if err != nil {
panic(err)
}
retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)
retrievedFingerprint := retrievedKey.Fingerprint
// If the previous key does not match, we know that the requested
// fingerprint does not live in the database.
if !retrievedFingerprint.Equal(f) {
@ -155,8 +156,8 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
}
s = &seriesFrontier{
lastSupertime: indexable.DecodeTime(retrievedKey.Timestamp),
lastTime: time.Unix(*retrievedKey.LastTimestamp, 0),
lastSupertime: retrievedKey.FirstTimestamp,
lastTime: retrievedKey.LastTimestamp,
}
key.Timestamp = lowerSeek
@ -173,9 +174,9 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
panic(err)
}
retrievedFingerprint = model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)
retrievedFingerprint = retrievedKey.Fingerprint
s.firstSupertime = indexable.DecodeTime(retrievedKey.Timestamp)
s.firstSupertime = retrievedKey.FirstTimestamp
return
}

View file

@ -17,7 +17,6 @@ import (
"code.google.com/p/goprotobuf/proto"
"flag"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
@ -608,12 +607,12 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
chunk := group[0:take]
group = group[take:lengthOfGroup]
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
key := model.SampleKey{
Fingerprint: fingerprint,
FirstTimestamp: chunk[0].Timestamp,
LastTimestamp: chunk[take-1].Timestamp,
SampleCount: uint32(take),
}.ToDTO()
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
@ -645,26 +644,19 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
return
}
func extractSampleKey(i leveldb.Iterator) (k *dto.SampleKey, err error) {
if i == nil {
panic("nil iterator")
func extractSampleKey(i leveldb.Iterator) (key model.SampleKey, err error) {
k := &dto.SampleKey{}
err = proto.Unmarshal(i.Key(), k)
if err != nil {
return
}
k = &dto.SampleKey{}
rawKey := i.Key()
if rawKey == nil {
panic("illegal condition; got nil key...")
}
err = proto.Unmarshal(rawKey, k)
key = model.NewSampleKeyFromDTO(k)
return
}
func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) {
if i == nil {
panic("nil iterator")
}
v = &dto.SampleValueSeries{}
err = proto.Unmarshal(i.Value(), v)
@ -691,20 +683,6 @@ func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool {
return false
}
type sampleKeyPredicate func(k *dto.SampleKey) bool
func keyIsOlderThan(t time.Time) sampleKeyPredicate {
return func(k *dto.SampleKey) bool {
return indexable.DecodeTime(k.Timestamp).After(t)
}
}
func keyIsAtMostOld(t time.Time) sampleKeyPredicate {
return func(k *dto.SampleKey) bool {
return !indexable.DecodeTime(k.Timestamp).After(t)
}
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
begin := time.Now()

View file

@ -189,14 +189,6 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
}
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples model.Values, err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}()
k := &dto.SampleKey{
Fingerprint: fp.ToDTO(),
Timestamp: indexable.EncodeTime(i.OldestInclusive),
@ -210,21 +202,17 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
predicate := keyIsOlderThan(i.NewestInclusive)
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
retrievedKey := &dto.SampleKey{}
retrievedKey, err = extractSampleKey(iterator)
retrievedKey, err := extractSampleKey(iterator)
if err != nil {
return
return samples, err
}
if predicate(retrievedKey) {
if retrievedKey.FirstTimestamp.After(i.NewestInclusive) {
break
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
if !retrievedKey.Fingerprint.Equal(fp) {
break
}

View file

@ -472,7 +472,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
targetKey = &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
}
foundKey = &dto.SampleKey{}
foundKey model.SampleKey
foundValue *dto.SampleValueSeries
)
@ -504,7 +504,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
//
// Only do the rewind if there is another chunk before this one.
rewound := false
firstTime := indexable.DecodeTime(foundKey.Timestamp)
firstTime := foundKey.FirstTimestamp
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
iterator.Previous()
rewound = true
@ -522,7 +522,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
if err != nil {
panic(err)
}
currentChunkLastTime := time.Unix(*foundKey.LastTimestamp, 0)
currentChunkLastTime := foundKey.LastTimestamp
if ts.After(currentChunkLastTime) {
sampleCount := len(foundValue.Value)