Merge pull request #254 from prometheus/feature/refactor/serialization

Several Refactorings: Simplified Encoder / Reduction of Naked Return Values
This commit is contained in:
juliusv 2013-05-16 07:01:27 -07:00
commit e5d077e146
32 changed files with 172 additions and 209 deletions

View file

@ -14,5 +14,5 @@
package coding
type Encoder interface {
Encode() ([]byte, error)
MustEncode() []byte
}

View file

@ -18,27 +18,26 @@ import (
"fmt"
)
type ProtocolBuffer struct {
type PBEncoder struct {
message proto.Message
}
func (p ProtocolBuffer) Encode() (raw []byte, err error) {
raw, err = proto.Marshal(p.message)
// XXX: Adjust legacy users of this to not check for error.
func (p PBEncoder) MustEncode() []byte {
raw, err := proto.Marshal(p.message)
if err != nil {
panic(err)
}
return
return raw
}
func (p ProtocolBuffer) String() string {
return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message)
func (p PBEncoder) String() string {
return fmt.Sprintf("PBEncoder of %T", p.message)
}
func NewProtocolBuffer(message proto.Message) *ProtocolBuffer {
return &ProtocolBuffer{
message: message,
}
// BUG(matt): Replace almost all calls to this with mechanisms that wrap the
// underlying protocol buffers with business logic types that simply encode
// themselves. If all points are done, then we'll no longer need this type.
func NewPBEncoder(m proto.Message) PBEncoder {
return PBEncoder{message: m}
}

View file

@ -83,14 +83,13 @@ func (c Config) Validate() error {
}
// GetJobByName finds a job by its name in a Config object.
func (c Config) GetJobByName(name string) (jobConfig *JobConfig) {
func (c Config) GetJobByName(name string) *JobConfig {
for _, job := range c.Job {
if job.GetName() == name {
jobConfig = &JobConfig{*job}
break
return &JobConfig{*job}
}
}
return
return nil
}
// Jobs returns all the jobs in a Config object.

View file

@ -19,10 +19,10 @@ import (
"io/ioutil"
)
func LoadFromString(configStr string) (config Config, err error) {
func LoadFromString(configStr string) (Config, error) {
configProto := pb.PrometheusConfig{}
if err = proto.UnmarshalText(configStr, &configProto); err != nil {
return
if err := proto.UnmarshalText(configStr, &configProto); err != nil {
return Config{}, err
}
if configProto.Global == nil {
configProto.Global = &pb.GlobalConfig{}
@ -32,17 +32,18 @@ func LoadFromString(configStr string) (config Config, err error) {
job.ScrapeInterval = proto.String(configProto.Global.GetScrapeInterval())
}
}
config = Config{configProto}
err = config.Validate()
return
config := Config{configProto}
err := config.Validate()
return config, err
}
func LoadFromFile(fileName string) (config Config, err error) {
func LoadFromFile(fileName string) (Config, error) {
configStr, err := ioutil.ReadFile(fileName)
if err != nil {
return
return Config{}, err
}
config, err = LoadFromString(string(configStr))
return
return LoadFromString(string(configStr))
}

View file

@ -67,16 +67,16 @@ type CurationKey struct {
}
// Equal answers whether the two CurationKeys are equivalent.
func (c CurationKey) Equal(o CurationKey) (equal bool) {
func (c CurationKey) Equal(o CurationKey) bool {
switch {
case !c.Fingerprint.Equal(o.Fingerprint):
return
return false
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
return
return false
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
return
return false
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
return
return false
}
return true

View file

@ -45,7 +45,7 @@ type Fingerprint interface {
}
// Builds a Fingerprint from a row key.
func NewFingerprintFromRowKey(rowKey string) (f Fingerprint) {
func NewFingerprintFromRowKey(rowKey string) Fingerprint {
components := strings.Split(rowKey, rowKeyDelimiter)
hash, err := strconv.ParseUint(components[0], 10, 64)
if err != nil {
@ -70,7 +70,7 @@ func NewFingerprintFromDTO(f *dto.Fingerprint) Fingerprint {
}
// Decomposes a Metric into a Fingerprint.
func NewFingerprintFromMetric(metric Metric) (f Fingerprint) {
func NewFingerprintFromMetric(metric Metric) Fingerprint {
labelLength := len(metric)
labelNames := make([]string, 0, labelLength)
@ -184,7 +184,7 @@ func (f Fingerprints) Len() int {
return len(f)
}
func (f Fingerprints) Less(i, j int) (less bool) {
func (f Fingerprints) Less(i, j int) bool {
return f[i].Less(f[j])
}

View file

@ -24,7 +24,7 @@ func (l LabelPairs) Len() int {
return len(l)
}
func (l LabelPairs) Less(i, j int) (less bool) {
func (l LabelPairs) Less(i, j int) bool {
if l[i].Name < l[j].Name {
return true
}

View file

@ -82,12 +82,12 @@ func (l LabelSet) String() string {
return buffer.String()
}
func (l LabelSet) ToMetric() (metric Metric) {
metric = Metric{}
func (l LabelSet) ToMetric() Metric {
metric := Metric{}
for label, value := range l {
metric[label] = value
}
return
return metric
}
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
@ -168,21 +168,18 @@ func (v Values) LastTimeBefore(t time.Time) bool {
// InsideInterval indicates whether a given range of sorted values could contain
// a value for a given time.
func (v Values) InsideInterval(t time.Time) (s bool) {
if v.Len() == 0 {
return
}
if t.Before(v[0].Timestamp) {
return
}
if !v[v.Len()-1].Timestamp.Before(t) {
return
}
func (v Values) InsideInterval(t time.Time) bool {
switch {
case v.Len() == 0:
return false
case t.Before(v[0].Timestamp):
return false
case !v[v.Len()-1].Timestamp.Before(t):
return false
default:
return true
}
}
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
@ -239,7 +236,7 @@ func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
})
}
return
return v
}
type SampleSet struct {

View file

@ -43,17 +43,16 @@ func (s Samples) Len() int {
return len(s)
}
func (s Samples) Less(i, j int) (less bool) {
if NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)) {
func (s Samples) Less(i, j int) bool {
switch {
case NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)):
return true
}
if s[i].Timestamp.Before(s[j].Timestamp) {
case s[i].Timestamp.Before(s[j].Timestamp):
return true
}
default:
return false
}
}
func (s Samples) Swap(i, j int) {
s[i], s[j] = s[j], s[i]

View file

@ -33,16 +33,16 @@ type SampleKey struct {
// MayContain indicates whether the given SampleKey could potentially contain a
// value at the provided time. Even if true is emitted, that does not mean a
// satisfactory value, in fact, exists.
func (s SampleKey) MayContain(t time.Time) (could bool) {
func (s SampleKey) MayContain(t time.Time) bool {
switch {
case t.Before(s.FirstTimestamp):
return
return false
case t.After(s.LastTimestamp):
return
}
return false
default:
return true
}
}
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
func (s SampleKey) ToDTO() (out *dto.SampleKey) {

View file

@ -34,22 +34,19 @@ type Registry interface {
type registry struct {
}
func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Processor, err error) {
func (r *registry) ProcessorForRequestHeader(header http.Header) (Processor, error) {
if header == nil {
err = fmt.Errorf("Received illegal and nil header.")
return
return nil, fmt.Errorf("Received illegal and nil header.")
}
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
err = fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
return
return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
}
if mediatype != "application/json" {
err = fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
return
return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
}
var prometheusApiVersion string
@ -62,13 +59,10 @@ func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Proc
switch prometheusApiVersion {
case "0.0.2":
processor = Processor002
return
return Processor002, nil
case "0.0.1":
processor = Processor001
return
return Processor001, nil
default:
err = fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
return
return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
}
}

View file

@ -58,20 +58,19 @@ type entity001 []struct {
} `json:"metric"`
}
func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
defer stream.Close()
buffer, err := ioutil.ReadAll(stream)
if err != nil {
return
return err
}
entities := entity001{}
err = json.Unmarshal(buffer, &entities)
if err != nil {
return
if err = json.Unmarshal(buffer, &entities); err != nil {
return err
}
// TODO(matt): This outer loop is a great basis for parallelization.
@ -140,7 +139,6 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa
}
break
default:
}
}
}
@ -148,5 +146,5 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa
results <- Result{Samples: pendingSamples}
}
return
return nil
}

View file

@ -177,7 +177,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
t.scheduler.Reschedule(earliest, futureState)
t.state = futureState
return
return err
}
func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) {
@ -194,13 +194,13 @@ func (t *target) scrape(timestamp time.Time, results chan format.Result) (err er
resp, err := t.client.Get(t.Address())
if err != nil {
return
return err
}
defer resp.Body.Close()
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
if err != nil {
return
return err
}
// XXX: This is a wart; we need to handle this more gracefully down the

View file

@ -53,7 +53,7 @@ func (m *targetManager) release() {
<-m.requestAllowance
}
func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) {
func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) *TargetPool {
targetPool, ok := m.poolsByJob[job.GetName()]
if !ok {
@ -62,9 +62,11 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInte
interval := job.ScrapeInterval()
m.poolsByJob[job.GetName()] = targetPool
// BUG(all): Investigate whether this auto-goroutine creation is desired.
go targetPool.Run(m.results, interval)
}
return
return targetPool
}
func (m *targetManager) AddTarget(job config.JobConfig, t Target, defaultScrapeInterval time.Duration) {

View file

@ -32,7 +32,7 @@ type TargetPool struct {
replaceTargetsQueue chan []Target
}
func NewTargetPool(m TargetManager) (p *TargetPool) {
func NewTargetPool(m TargetManager) *TargetPool {
return &TargetPool{
manager: m,
addTargetQueue: make(chan Target),
@ -151,7 +151,7 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}
// XXX: Not really thread-safe. Only used in /status page for now.
// BUG(all): Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target {
return p.targets
}

View file

@ -25,14 +25,15 @@ import (
// States that active alerts can be in.
type alertState int
func (s alertState) String() (state string) {
func (s alertState) String() string {
switch s {
case PENDING:
state = "pending"
return "pending"
case FIRING:
state = "firing"
return "firing"
default:
panic("undefined")
}
return
}
const (
@ -88,15 +89,15 @@ type AlertingRule struct {
func (rule AlertingRule) Name() string { return rule.name }
func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage)
}
func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
// Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp, storage)
if err != nil {
return
return nil, err
}
// Create pending alerts for any new vector elements in the alert expression.
@ -115,6 +116,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage
}
}
vector := ast.Vector{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, activeAlert := range rule.activeAlerts {
if !resultFingerprints.Has(fp) {
@ -130,7 +133,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage
vector = append(vector, activeAlert.sample(timestamp, 1))
}
return
return vector, nil
}
func (rule AlertingRule) ToDotGraph() string {

View file

@ -269,14 +269,14 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.Tie
return
}
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (matrix Matrix, err error) {
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (Matrix, error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON.
matrix = Matrix{}
matrix := Matrix{}
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage)
if err != nil {
return
return nil, err
}
// TODO implement watchdog timer for long-running queries.
sampleSets := map[string]*model.SampleSet{}
@ -302,7 +302,8 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
for _, sampleSet := range sampleSets {
matrix = append(matrix, *sampleSet)
}
return
return matrix, nil
}
func labelIntersection(metric1, metric2 model.Metric) model.Metric {

View file

@ -146,7 +146,7 @@ func (sorter vectorByValueSorter) Len() int {
return len(sorter.vector)
}
func (sorter vectorByValueSorter) Less(i, j int) (less bool) {
func (sorter vectorByValueSorter) Less(i, j int) bool {
return sorter.vector[i].Value < sorter.vector[j].Value
}

View file

@ -60,7 +60,7 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m
// surrounding a given target time. If samples are found both before and after
// the target time, the sample value is interpolated between these. Otherwise,
// the single closest sample is returned verbatim.
func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) (sample *model.SamplePair) {
func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) *model.SamplePair {
var closestBefore *model.SamplePair
var closestAfter *model.SamplePair
for _, candidate := range samples {
@ -96,14 +96,12 @@ func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.T
switch {
case closestBefore != nil && closestAfter != nil:
sample = interpolateSamples(closestBefore, closestAfter, timestamp)
return interpolateSamples(closestBefore, closestAfter, timestamp)
case closestBefore != nil:
sample = closestBefore
return closestBefore
default:
sample = closestAfter
return closestAfter
}
return
}
func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) {
@ -122,7 +120,7 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp
})
}
}
return
return samples, err
}
func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {

View file

@ -95,7 +95,7 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
}
}
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (*viewAdapter, error) {
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder()
@ -106,13 +106,13 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.
viewBuilder.GetMetricAtTime(fingerprint, timestamp)
}
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil {
viewAdapter = NewViewAdapter(view, storage)
if err != nil {
return nil, err
}
return
return NewViewAdapter(view, storage), nil
}
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (*viewAdapter, error) {
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder()
@ -126,8 +126,8 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval)
}
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil {
viewAdapter = NewViewAdapter(view, storage)
if err != nil {
return nil, err
}
return
return NewViewAdapter(view, storage), nil
}

View file

@ -31,15 +31,15 @@ type RecordingRule struct {
func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage)
}
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
// Get the raw value of the rule expression.
vector, err = rule.EvalRaw(timestamp, storage)
vector, err := rule.EvalRaw(timestamp, storage)
if err != nil {
return
return nil, err
}
// Override the metric name and labels.
@ -53,7 +53,8 @@ func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorag
}
}
}
return
return vector, nil
}
func (rule RecordingRule) ToDotGraph() string {

View file

@ -26,9 +26,9 @@ type Rule interface {
Name() string
// EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting.
EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
// ToDotGraph returns a Graphviz dot graph of the rule.
ToDotGraph() string
}

View file

@ -220,7 +220,7 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge
}.ToDTO()
curationValue := &dto.CurationValue{}
rawKey := coding.NewProtocolBuffer(curationKey)
rawKey := coding.NewPBEncoder(curationKey)
has, err := states.Has(rawKey)
if err != nil {
@ -341,11 +341,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
}
prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode()
if err != nil {
// An encoding failure of a key is no reason to stop.
return &storage.OperatorError{error: err, Continuable: true}
}
prospectiveKey := coding.NewPBEncoder(startKey.ToDTO()).MustEncode()
if !w.sampleIterator.Seek(prospectiveKey) {
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
// no work may occur, and the iterator cannot be recovered.
@ -390,7 +386,7 @@ func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished t
LastCompletionTimestamp: finished,
}.ToDTO()
err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue))
err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue))
return
}

View file

@ -115,10 +115,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
Timestamp: upperSeek,
}
raw, err := coding.NewProtocolBuffer(key).Encode()
if err != nil {
panic(err)
}
raw := coding.NewPBEncoder(key).MustEncode()
i.Seek(raw)
if i.Key() == nil {
@ -160,10 +157,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
key.Timestamp = lowerSeek
raw, err = coding.NewProtocolBuffer(key).Encode()
if err != nil {
panic(err)
}
raw = coding.NewPBEncoder(key).MustEncode()
i.Seek(raw)

View file

@ -305,7 +305,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO())
}
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
}
err = l.labelNameToFingerprints.Commit(batch)
@ -377,7 +377,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO())
}
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
}
err = l.labelSetToFingerprints.Commit(batch)
@ -403,8 +403,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
defer batch.Close()
for fingerprint, metric := range metrics {
key := coding.NewProtocolBuffer(fingerprint.ToDTO())
value := coding.NewProtocolBuffer(model.MetricToDTO(metric))
key := coding.NewPBEncoder(fingerprint.ToDTO())
value := coding.NewPBEncoder(model.MetricToDTO(metric))
batch.Put(key, value)
}
@ -469,7 +469,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics {
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
key := coding.NewPBEncoder(model.MetricToDTO(metric))
batch.Put(key, key)
}
@ -498,7 +498,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
value := &dto.MetricHighWatermark{}
raw := []byte{}
newestSampleTimestamp := samples[len(samples)-1].Timestamp
keyEncoded := coding.NewProtocolBuffer(key)
keyEncoded := coding.NewPBEncoder(key)
key.Signature = proto.String(fingerprint.ToRowKey())
raw, err = l.MetricHighWatermarks.Get(keyEncoded)
@ -517,7 +517,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
}
}
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(keyEncoded, coding.NewProtocolBuffer(value))
batch.Put(keyEncoded, coding.NewPBEncoder(value))
mutationCount++
}
@ -592,7 +592,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
})
}
samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
}
}
@ -665,7 +665,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
dtoKey := coding.NewPBEncoder(dto)
value, err = l.metricMembershipIndex.Has(dtoKey)
return
@ -678,7 +678,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
dtoKey := coding.NewPBEncoder(dto)
value, err = l.labelSetToFingerprints.Has(dtoKey)
return
@ -691,7 +691,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
dtoKey := coding.NewPBEncoder(dto)
value, err = l.labelNameToFingerprints.Has(dtoKey)
return
@ -707,7 +707,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
sets := []utility.Set{}
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO))
f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO))
if err != nil {
return fps, err
}
@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}(time.Now())
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName)))
if err != nil {
return
}
@ -779,7 +779,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}(time.Now())
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f)))
if err != nil {
return
}

View file

@ -153,7 +153,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
@ -165,14 +165,14 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize:
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
key := coding.NewPBEncoder(newSampleKey.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(sampleValues) > 0 {
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
@ -190,7 +190,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
@ -211,8 +211,8 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(sampleValues, pendingSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
key := coding.NewPBEncoder(newSampleKey.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingSamples = model.Values{}
pendingMutations++
@ -320,14 +320,14 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
pendingBatch = nil
case !sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
lastCurated = sampleKey.LastTimestamp
sampleValues = model.Values{}
pendingMutations++
case sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
pendingMutations++
@ -335,8 +335,8 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
if len(sampleValues) > 0 {
sampleKey = sampleValues.ToSampleKey(fingerprint)
lastCurated = sampleKey.FirstTimestamp
newKey := coding.NewProtocolBuffer(sampleKey.ToDTO())
newValue := coding.NewProtocolBuffer(sampleValues.ToDTO())
newKey := coding.NewPBEncoder(sampleKey.ToDTO())
newValue := coding.NewPBEncoder(sampleValues.ToDTO())
pendingBatch.Put(newKey, newValue)
pendingMutations++
} else {

View file

@ -61,14 +61,14 @@ func (c curationState) Get() (key, value coding.Encoder) {
if err != nil {
panic(err)
}
key = coding.NewProtocolBuffer(model.CurationKey{
key = coding.NewPBEncoder(model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: c.processor.Name(),
IgnoreYoungerThan: c.ignoreYoungerThan,
}.ToDTO())
value = coding.NewProtocolBuffer(model.CurationRemark{
value = coding.NewPBEncoder(model.CurationRemark{
LastCompletionTimestamp: c.lastCurated,
}.ToDTO())
@ -76,20 +76,20 @@ func (c curationState) Get() (key, value coding.Encoder) {
}
func (w watermarkState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
return
}
func (s sampleGroup) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.SampleKey{
key = coding.NewPBEncoder(model.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
FirstTimestamp: s.values[0].Timestamp,
LastTimestamp: s.values[len(s.values)-1].Timestamp,
SampleCount: uint32(len(s.values)),
}.ToDTO())
value = coding.NewProtocolBuffer(s.values.ToDTO())
value = coding.NewPBEncoder(s.values.ToDTO())
return
}

View file

@ -194,10 +194,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}
e := coding.NewPBEncoder(k).MustEncode()
iterator := l.MetricSamples.NewIterator(true)
defer iterator.Close()

View file

@ -488,7 +488,7 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
}
// Try seeking to target key.
rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode()
rawKey := coding.NewPBEncoder(targetKey).MustEncode()
iterator.Seek(rawKey)
foundKey, err := extractSampleKey(iterator)

View file

@ -15,13 +15,18 @@ package leveldb
import (
"github.com/prometheus/prometheus/coding"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
type indexValue struct{}
func (i *indexValue) MustEncode() []byte {
return []byte{}
}
var (
existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{})
existenceValue = &indexValue{}
)
type LevelDBMembershipIndex struct {

View file

@ -32,24 +32,15 @@ func NewBatch() *batch {
}
func (b *batch) Drop(key coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
keyEncoded := key.MustEncode()
b.drops++
b.batch.Delete(keyEncoded)
}
func (b *batch) Put(key, value coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
valueEncoded, err := value.Encode()
if err != nil {
panic(err)
}
keyEncoded := key.MustEncode()
valueEncoded := value.MustEncode()
b.puts++
b.batch.Put(keyEncoded, valueEncoded)

View file

@ -251,10 +251,7 @@ func (l *LevelDBPersistence) Close() {
}
func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) {
key, err := value.Encode()
if err != nil {
return
}
key := value.MustEncode()
return l.storage.Get(l.readOptions, key)
}
@ -271,26 +268,16 @@ func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) {
}
func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) {
key, err := value.Encode()
if err != nil {
return
}
key := value.MustEncode()
err = l.storage.Delete(l.writeOptions, key)
return
}
func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) {
keyEncoded, err := key.Encode()
if err != nil {
return
}
keyEncoded := key.MustEncode()
valueEncoded, err := value.Encode()
if err != nil {
return
}
valueEncoded := value.MustEncode()
err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded)