diff --git a/coding/protocol_buffer.go b/coding/pbencoder.go similarity index 62% rename from coding/protocol_buffer.go rename to coding/pbencoder.go index 543344fee7..8fdfaa897a 100644 --- a/coding/protocol_buffer.go +++ b/coding/pbencoder.go @@ -18,14 +18,12 @@ import ( "fmt" ) -type ProtocolBuffer struct { +type PBEncoder struct { message proto.Message } -func (p ProtocolBuffer) MustEncode() []byte { +func (p PBEncoder) MustEncode() []byte { raw, err := proto.Marshal(p.message) - - // XXX: Adjust legacy users of this to not check for error. if err != nil { panic(err) } @@ -33,12 +31,13 @@ func (p ProtocolBuffer) MustEncode() []byte { return raw } -func (p ProtocolBuffer) String() string { - return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message) +func (p PBEncoder) String() string { + return fmt.Sprintf("PBEncoder of %T", p.message) } -func NewProtocolBuffer(message proto.Message) *ProtocolBuffer { - return &ProtocolBuffer{ - message: message, - } +// BUG(matt): Replace almost all calls to this with mechanisms that wrap the +// underlying protocol buffers with business logic types that simply encode +// themselves. If all points are done, then we'll no longer need this type. +func NewPBEncoder(m proto.Message) PBEncoder { + return PBEncoder{message: m} } diff --git a/config/config.go b/config/config.go index d5061dd04b..747c1f2dbe 100644 --- a/config/config.go +++ b/config/config.go @@ -83,14 +83,13 @@ func (c Config) Validate() error { } // GetJobByName finds a job by its name in a Config object. -func (c Config) GetJobByName(name string) (jobConfig *JobConfig) { +func (c Config) GetJobByName(name string) *JobConfig { for _, job := range c.Job { if job.GetName() == name { - jobConfig = &JobConfig{*job} - break + return &JobConfig{*job} } } - return + return nil } // Jobs returns all the jobs in a Config object. diff --git a/config/load.go b/config/load.go index 28d49da417..983961a294 100644 --- a/config/load.go +++ b/config/load.go @@ -19,10 +19,10 @@ import ( "io/ioutil" ) -func LoadFromString(configStr string) (config Config, err error) { +func LoadFromString(configStr string) (Config, error) { configProto := pb.PrometheusConfig{} - if err = proto.UnmarshalText(configStr, &configProto); err != nil { - return + if err := proto.UnmarshalText(configStr, &configProto); err != nil { + return Config{}, err } if configProto.Global == nil { configProto.Global = &pb.GlobalConfig{} @@ -32,17 +32,18 @@ func LoadFromString(configStr string) (config Config, err error) { job.ScrapeInterval = proto.String(configProto.Global.GetScrapeInterval()) } } - config = Config{configProto} - err = config.Validate() - return + + config := Config{configProto} + err := config.Validate() + + return config, err } -func LoadFromFile(fileName string) (config Config, err error) { +func LoadFromFile(fileName string) (Config, error) { configStr, err := ioutil.ReadFile(fileName) if err != nil { - return + return Config{}, err } - config, err = LoadFromString(string(configStr)) - return + return LoadFromString(string(configStr)) } diff --git a/model/curation.go b/model/curation.go index c7bb8594a8..dc70cd8d21 100644 --- a/model/curation.go +++ b/model/curation.go @@ -67,16 +67,16 @@ type CurationKey struct { } // Equal answers whether the two CurationKeys are equivalent. -func (c CurationKey) Equal(o CurationKey) (equal bool) { +func (c CurationKey) Equal(o CurationKey) bool { switch { case !c.Fingerprint.Equal(o.Fingerprint): - return + return false case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0: - return + return false case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName: - return + return false case c.IgnoreYoungerThan != o.IgnoreYoungerThan: - return + return false } return true diff --git a/model/fingerprinting.go b/model/fingerprinting.go index 123e1292aa..cdd0a6bbb4 100644 --- a/model/fingerprinting.go +++ b/model/fingerprinting.go @@ -45,7 +45,7 @@ type Fingerprint interface { } // Builds a Fingerprint from a row key. -func NewFingerprintFromRowKey(rowKey string) (f Fingerprint) { +func NewFingerprintFromRowKey(rowKey string) Fingerprint { components := strings.Split(rowKey, rowKeyDelimiter) hash, err := strconv.ParseUint(components[0], 10, 64) if err != nil { @@ -70,7 +70,7 @@ func NewFingerprintFromDTO(f *dto.Fingerprint) Fingerprint { } // Decomposes a Metric into a Fingerprint. -func NewFingerprintFromMetric(metric Metric) (f Fingerprint) { +func NewFingerprintFromMetric(metric Metric) Fingerprint { labelLength := len(metric) labelNames := make([]string, 0, labelLength) @@ -184,7 +184,7 @@ func (f Fingerprints) Len() int { return len(f) } -func (f Fingerprints) Less(i, j int) (less bool) { +func (f Fingerprints) Less(i, j int) bool { return f[i].Less(f[j]) } diff --git a/model/labelpair.go b/model/labelpair.go index 12c5c32105..c6e34a446b 100644 --- a/model/labelpair.go +++ b/model/labelpair.go @@ -24,7 +24,7 @@ func (l LabelPairs) Len() int { return len(l) } -func (l LabelPairs) Less(i, j int) (less bool) { +func (l LabelPairs) Less(i, j int) bool { if l[i].Name < l[j].Name { return true } diff --git a/model/metric.go b/model/metric.go index 12c5094da3..892e620374 100644 --- a/model/metric.go +++ b/model/metric.go @@ -82,12 +82,12 @@ func (l LabelSet) String() string { return buffer.String() } -func (l LabelSet) ToMetric() (metric Metric) { - metric = Metric{} +func (l LabelSet) ToMetric() Metric { + metric := Metric{} for label, value := range l { metric[label] = value } - return + return metric } // A Metric is similar to a LabelSet, but the key difference is that a Metric is @@ -168,20 +168,17 @@ func (v Values) LastTimeBefore(t time.Time) bool { // InsideInterval indicates whether a given range of sorted values could contain // a value for a given time. -func (v Values) InsideInterval(t time.Time) (s bool) { - if v.Len() == 0 { - return +func (v Values) InsideInterval(t time.Time) bool { + switch { + case v.Len() == 0: + return false + case t.Before(v[0].Timestamp): + return false + case !v[v.Len()-1].Timestamp.Before(t): + return false + default: + return true } - - if t.Before(v[0].Timestamp) { - return - } - - if !v[v.Len()-1].Timestamp.Before(t) { - return - } - - return true } // TruncateBefore returns a subslice of the original such that extraneous @@ -239,7 +236,7 @@ func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) { }) } - return + return v } type SampleSet struct { diff --git a/model/sample.go b/model/sample.go index b29b19cd8e..13e5840d8d 100644 --- a/model/sample.go +++ b/model/sample.go @@ -43,16 +43,15 @@ func (s Samples) Len() int { return len(s) } -func (s Samples) Less(i, j int) (less bool) { - if NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)) { +func (s Samples) Less(i, j int) bool { + switch { + case NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)): return true - } - - if s[i].Timestamp.Before(s[j].Timestamp) { + case s[i].Timestamp.Before(s[j].Timestamp): return true + default: + return false } - - return false } func (s Samples) Swap(i, j int) { diff --git a/model/samplekey.go b/model/samplekey.go index 103fefecd5..39ac893859 100644 --- a/model/samplekey.go +++ b/model/samplekey.go @@ -33,15 +33,15 @@ type SampleKey struct { // MayContain indicates whether the given SampleKey could potentially contain a // value at the provided time. Even if true is emitted, that does not mean a // satisfactory value, in fact, exists. -func (s SampleKey) MayContain(t time.Time) (could bool) { +func (s SampleKey) MayContain(t time.Time) bool { switch { case t.Before(s.FirstTimestamp): - return + return false case t.After(s.LastTimestamp): - return + return false + default: + return true } - - return true } // ToDTO converts this SampleKey into a DTO for use in serialization purposes. diff --git a/retrieval/format/discriminator.go b/retrieval/format/discriminator.go index 5c7f8f8cff..be874be850 100644 --- a/retrieval/format/discriminator.go +++ b/retrieval/format/discriminator.go @@ -34,22 +34,19 @@ type Registry interface { type registry struct { } -func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Processor, err error) { +func (r *registry) ProcessorForRequestHeader(header http.Header) (Processor, error) { if header == nil { - err = fmt.Errorf("Received illegal and nil header.") - return + return nil, fmt.Errorf("Received illegal and nil header.") } mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) if err != nil { - err = fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err) - return + return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err) } if mediatype != "application/json" { - err = fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json") - return + return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json") } var prometheusApiVersion string @@ -62,13 +59,10 @@ func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Proc switch prometheusApiVersion { case "0.0.2": - processor = Processor002 - return + return Processor002, nil case "0.0.1": - processor = Processor001 - return + return Processor001, nil default: - err = fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) - return + return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) } } diff --git a/retrieval/format/processor0_0_1.go b/retrieval/format/processor0_0_1.go index 59e8475215..9573a36337 100644 --- a/retrieval/format/processor0_0_1.go +++ b/retrieval/format/processor0_0_1.go @@ -58,20 +58,19 @@ type entity001 []struct { } `json:"metric"` } -func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) { +func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error { // TODO(matt): Replace with plain-jane JSON unmarshalling. defer stream.Close() buffer, err := ioutil.ReadAll(stream) if err != nil { - return + return err } entities := entity001{} - err = json.Unmarshal(buffer, &entities) - if err != nil { - return + if err = json.Unmarshal(buffer, &entities); err != nil { + return err } // TODO(matt): This outer loop is a great basis for parallelization. @@ -140,7 +139,6 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa } break - default: } } } @@ -148,5 +146,5 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa results <- Result{Samples: pendingSamples} } - return + return nil } diff --git a/retrieval/target.go b/retrieval/target.go index 31ad6a1bf4..6d50f287bc 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -177,7 +177,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err t.scheduler.Reschedule(earliest, futureState) t.state = futureState - return + return err } func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) { @@ -194,13 +194,13 @@ func (t *target) scrape(timestamp time.Time, results chan format.Result) (err er resp, err := t.client.Get(t.Address()) if err != nil { - return + return err } defer resp.Body.Close() processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header) if err != nil { - return + return err } // XXX: This is a wart; we need to handle this more gracefully down the diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 38518c18db..8ccb40d784 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -53,7 +53,7 @@ func (m *targetManager) release() { <-m.requestAllowance } -func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) { +func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) *TargetPool { targetPool, ok := m.poolsByJob[job.GetName()] if !ok { @@ -62,9 +62,11 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInte interval := job.ScrapeInterval() m.poolsByJob[job.GetName()] = targetPool + // BUG(all): Investigate whether this auto-goroutine creation is desired. go targetPool.Run(m.results, interval) } - return + + return targetPool } func (m *targetManager) AddTarget(job config.JobConfig, t Target, defaultScrapeInterval time.Duration) { diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index a6b43b9de5..32e0cf606a 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -32,7 +32,7 @@ type TargetPool struct { replaceTargetsQueue chan []Target } -func NewTargetPool(m TargetManager) (p *TargetPool) { +func NewTargetPool(m TargetManager) *TargetPool { return &TargetPool{ manager: m, addTargetQueue: make(chan Target), @@ -151,7 +151,7 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration) } -// XXX: Not really thread-safe. Only used in /status page for now. +// BUG(all): Not really thread-safe. Only used in /status page for now. func (p *TargetPool) Targets() []Target { return p.targets } diff --git a/rules/alerting.go b/rules/alerting.go index 7b259ebebd..d7d56d58a5 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -25,14 +25,15 @@ import ( // States that active alerts can be in. type alertState int -func (s alertState) String() (state string) { +func (s alertState) String() string { switch s { case PENDING: - state = "pending" + return "pending" case FIRING: - state = "firing" + return "firing" + default: + panic("undefined") } - return } const ( @@ -88,15 +89,15 @@ type AlertingRule struct { func (rule AlertingRule) Name() string { return rule.name } -func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { +func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { return ast.EvalVectorInstant(rule.vector, timestamp, storage) } -func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { +func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { // Get the raw value of the rule expression. exprResult, err := rule.EvalRaw(timestamp, storage) if err != nil { - return + return nil, err } // Create pending alerts for any new vector elements in the alert expression. @@ -115,6 +116,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage } } + vector := ast.Vector{} + // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, activeAlert := range rule.activeAlerts { if !resultFingerprints.Has(fp) { @@ -130,7 +133,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage vector = append(vector, activeAlert.sample(timestamp, 1)) } - return + + return vector, nil } func (rule AlertingRule) ToDotGraph() string { diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 94d25b7f7d..814f88d6d4 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -269,14 +269,14 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.Tie return } -func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (matrix Matrix, err error) { +func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (Matrix, error) { // Explicitly initialize to an empty matrix since a nil Matrix encodes to // null in JSON. - matrix = Matrix{} + matrix := Matrix{} viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage) if err != nil { - return + return nil, err } // TODO implement watchdog timer for long-running queries. sampleSets := map[string]*model.SampleSet{} @@ -302,7 +302,8 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t for _, sampleSet := range sampleSets { matrix = append(matrix, *sampleSet) } - return + + return matrix, nil } func labelIntersection(metric1, metric2 model.Metric) model.Metric { diff --git a/rules/ast/functions.go b/rules/ast/functions.go index 2c833c500f..a392bb7360 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -146,7 +146,7 @@ func (sorter vectorByValueSorter) Len() int { return len(sorter.vector) } -func (sorter vectorByValueSorter) Less(i, j int) (less bool) { +func (sorter vectorByValueSorter) Less(i, j int) bool { return sorter.vector[i].Value < sorter.vector[j].Value } diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 973364eb8e..391bc30960 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -60,7 +60,7 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m // surrounding a given target time. If samples are found both before and after // the target time, the sample value is interpolated between these. Otherwise, // the single closest sample is returned verbatim. -func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) (sample *model.SamplePair) { +func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) *model.SamplePair { var closestBefore *model.SamplePair var closestAfter *model.SamplePair for _, candidate := range samples { @@ -96,14 +96,12 @@ func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.T switch { case closestBefore != nil && closestAfter != nil: - sample = interpolateSamples(closestBefore, closestAfter, timestamp) + return interpolateSamples(closestBefore, closestAfter, timestamp) case closestBefore != nil: - sample = closestBefore + return closestBefore default: - sample = closestAfter + return closestAfter } - - return } func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) { @@ -122,7 +120,7 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp }) } } - return + return samples, err } func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) { diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 549ea97908..42200c8047 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -95,7 +95,7 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) { } } -func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) { +func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (*viewAdapter, error) { analyzer := NewQueryAnalyzer(storage) analyzer.AnalyzeQueries(node) viewBuilder := metric.NewViewRequestBuilder() @@ -106,13 +106,13 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric. viewBuilder.GetMetricAtTime(fingerprint, timestamp) } view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) - if err == nil { - viewAdapter = NewViewAdapter(view, storage) + if err != nil { + return nil, err } - return + return NewViewAdapter(view, storage), nil } -func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) { +func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (*viewAdapter, error) { analyzer := NewQueryAnalyzer(storage) analyzer.AnalyzeQueries(node) viewBuilder := metric.NewViewRequestBuilder() @@ -126,8 +126,8 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval) } view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) - if err == nil { - viewAdapter = NewViewAdapter(view, storage) + if err != nil { + return nil, err } - return + return NewViewAdapter(view, storage), nil } diff --git a/rules/recording.go b/rules/recording.go index 62984332e1..6872b95be2 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -31,15 +31,15 @@ type RecordingRule struct { func (rule RecordingRule) Name() string { return rule.name } -func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { +func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { return ast.EvalVectorInstant(rule.vector, timestamp, storage) } -func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { +func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) { // Get the raw value of the rule expression. - vector, err = rule.EvalRaw(timestamp, storage) + vector, err := rule.EvalRaw(timestamp, storage) if err != nil { - return + return nil, err } // Override the metric name and labels. @@ -53,7 +53,8 @@ func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorag } } } - return + + return vector, nil } func (rule RecordingRule) ToDotGraph() string { diff --git a/rules/rules.go b/rules/rules.go index 553fb30ace..bbfb69d6aa 100644 --- a/rules/rules.go +++ b/rules/rules.go @@ -26,9 +26,9 @@ type Rule interface { Name() string // EvalRaw evaluates the rule's vector expression without triggering any // other actions, like recording or alerting. - EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) + EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) // Eval evaluates the rule, including any associated recording or alerting actions. - Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) + Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) // ToDotGraph returns a Graphviz dot graph of the rule. ToDotGraph() string } diff --git a/storage/metric/curator.go b/storage/metric/curator.go index e6ee7c704a..33010b6893 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -220,7 +220,7 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge }.ToDTO() curationValue := &dto.CurationValue{} - rawKey := coding.NewProtocolBuffer(curationKey) + rawKey := coding.NewPBEncoder(curationKey) has, err := states.Has(rawKey) if err != nil { @@ -341,7 +341,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr FirstTimestamp: seriesFrontier.optimalStartTime(curationState), } - prospectiveKey := coding.NewProtocolBuffer(startKey.ToDTO()).MustEncode() + prospectiveKey := coding.NewPBEncoder(startKey.ToDTO()).MustEncode() if !w.sampleIterator.Seek(prospectiveKey) { // LevelDB is picky about the seek ranges. If an iterator was invalidated, // no work may occur, and the iterator cannot be recovered. @@ -386,7 +386,7 @@ func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished t LastCompletionTimestamp: finished, }.ToDTO() - err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue)) + err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue)) return } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index bdb5f87cdd..276eb5788f 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -115,7 +115,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) Timestamp: upperSeek, } - raw := coding.NewProtocolBuffer(key).MustEncode() + raw := coding.NewPBEncoder(key).MustEncode() i.Seek(raw) if i.Key() == nil { @@ -157,7 +157,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) key.Timestamp = lowerSeek - raw = coding.NewProtocolBuffer(key).MustEncode() + raw = coding.NewPBEncoder(key).MustEncode() i.Seek(raw) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index a6ce87e970..902c3ed842 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -305,7 +305,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) + batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) } err = l.labelNameToFingerprints.Commit(batch) @@ -377,7 +377,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint value.Member = append(value.Member, fingerprint.ToDTO()) } - batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) + batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) } err = l.labelSetToFingerprints.Commit(batch) @@ -403,8 +403,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri defer batch.Close() for fingerprint, metric := range metrics { - key := coding.NewProtocolBuffer(fingerprint.ToDTO()) - value := coding.NewProtocolBuffer(model.MetricToDTO(metric)) + key := coding.NewPBEncoder(fingerprint.ToDTO()) + value := coding.NewPBEncoder(model.MetricToDTO(metric)) batch.Put(key, value) } @@ -469,7 +469,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // WART: We should probably encode simple fingerprints. for _, metric := range absentMetrics { - key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) + key := coding.NewPBEncoder(model.MetricToDTO(metric)) batch.Put(key, key) } @@ -498,7 +498,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger value := &dto.MetricHighWatermark{} raw := []byte{} newestSampleTimestamp := samples[len(samples)-1].Timestamp - keyEncoded := coding.NewProtocolBuffer(key) + keyEncoded := coding.NewPBEncoder(key) key.Signature = proto.String(fingerprint.ToRowKey()) raw, err = l.MetricHighWatermarks.Get(keyEncoded) @@ -517,7 +517,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger } } value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(keyEncoded, coding.NewProtocolBuffer(value)) + batch.Put(keyEncoded, coding.NewPBEncoder(value)) mutationCount++ } @@ -592,7 +592,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err }) } - samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value)) + samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) } } @@ -665,7 +665,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }(time.Now()) - dtoKey := coding.NewProtocolBuffer(dto) + dtoKey := coding.NewPBEncoder(dto) value, err = l.metricMembershipIndex.Has(dtoKey) return @@ -678,7 +678,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }(time.Now()) - dtoKey := coding.NewProtocolBuffer(dto) + dtoKey := coding.NewPBEncoder(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) return @@ -691,7 +691,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }(time.Now()) - dtoKey := coding.NewProtocolBuffer(dto) + dtoKey := coding.NewPBEncoder(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) return @@ -707,7 +707,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab sets := []utility.Set{} for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { - f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO)) + f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO)) if err != nil { return fps, err } @@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }(time.Now()) - raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) + raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName))) if err != nil { return } @@ -779,7 +779,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }(time.Now()) - raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) + raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f))) if err != nil { return } diff --git a/storage/metric/processor.go b/storage/metric/processor.go index f3fcf7b6c9..3968f5a5b9 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -153,7 +153,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize: if !keyDropped { - key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } @@ -165,14 +165,14 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi // If the number of pending writes equals the target group size case len(pendingSamples) == p.MinimumGroupSize: newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewProtocolBuffer(newSampleKey.ToDTO()) - value := coding.NewProtocolBuffer(pendingSamples.ToDTO()) + key := coding.NewPBEncoder(newSampleKey.ToDTO()) + value := coding.NewPBEncoder(pendingSamples.ToDTO()) pendingBatch.Put(key, value) pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) if len(sampleValues) > 0 { if !keyDropped { - key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } @@ -190,7 +190,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize: if !keyDropped { - key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } @@ -211,8 +211,8 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi if len(sampleValues) > 0 || len(pendingSamples) > 0 { pendingSamples = append(sampleValues, pendingSamples...) newSampleKey := pendingSamples.ToSampleKey(fingerprint) - key := coding.NewProtocolBuffer(newSampleKey.ToDTO()) - value := coding.NewProtocolBuffer(pendingSamples.ToDTO()) + key := coding.NewPBEncoder(newSampleKey.ToDTO()) + value := coding.NewPBEncoder(pendingSamples.ToDTO()) pendingBatch.Put(key, value) pendingSamples = model.Values{} pendingMutations++ @@ -320,14 +320,14 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist pendingBatch = nil case !sampleKey.MayContain(stopAt): - key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) lastCurated = sampleKey.LastTimestamp sampleValues = model.Values{} pendingMutations++ case sampleKey.MayContain(stopAt): - key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) pendingMutations++ @@ -335,8 +335,8 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist if len(sampleValues) > 0 { sampleKey = sampleValues.ToSampleKey(fingerprint) lastCurated = sampleKey.FirstTimestamp - newKey := coding.NewProtocolBuffer(sampleKey.ToDTO()) - newValue := coding.NewProtocolBuffer(sampleValues.ToDTO()) + newKey := coding.NewPBEncoder(sampleKey.ToDTO()) + newValue := coding.NewPBEncoder(sampleValues.ToDTO()) pendingBatch.Put(newKey, newValue) pendingMutations++ } else { diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index e534f0ee3a..2eeb12250e 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -61,14 +61,14 @@ func (c curationState) Get() (key, value coding.Encoder) { if err != nil { panic(err) } - key = coding.NewProtocolBuffer(model.CurationKey{ + key = coding.NewPBEncoder(model.CurationKey{ Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), ProcessorMessageRaw: signature, ProcessorMessageTypeName: c.processor.Name(), IgnoreYoungerThan: c.ignoreYoungerThan, }.ToDTO()) - value = coding.NewProtocolBuffer(model.CurationRemark{ + value = coding.NewPBEncoder(model.CurationRemark{ LastCompletionTimestamp: c.lastCurated, }.ToDTO()) @@ -76,20 +76,20 @@ func (c curationState) Get() (key, value coding.Encoder) { } func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) return } func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBuffer(model.SampleKey{ + key = coding.NewPBEncoder(model.SampleKey{ Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), FirstTimestamp: s.values[0].Timestamp, LastTimestamp: s.values[len(s.values)-1].Timestamp, SampleCount: uint32(len(s.values)), }.ToDTO()) - value = coding.NewProtocolBuffer(s.values.ToDTO()) + value = coding.NewPBEncoder(s.values.ToDTO()) return } diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 482294a5bc..6b95744c9c 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -194,7 +194,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e := coding.NewProtocolBuffer(k).MustEncode() + e := coding.NewPBEncoder(k).MustEncode() iterator := l.MetricSamples.NewIterator(true) defer iterator.Close() diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index dacfb13c3e..9daee66c0f 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -488,7 +488,7 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Try seeking to target key. - rawKey := coding.NewProtocolBuffer(targetKey).MustEncode() + rawKey := coding.NewPBEncoder(targetKey).MustEncode() iterator.Seek(rawKey) foundKey, err := extractSampleKey(iterator) diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 01a72a66b8..99590eee69 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -15,13 +15,18 @@ package leveldb import ( "github.com/prometheus/prometheus/coding" - dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" ) +type indexValue struct{} + +func (i *indexValue) MustEncode() []byte { + return []byte{} +} + var ( - existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{}) + existenceValue = &indexValue{} ) type LevelDBMembershipIndex struct {