mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Destroy naked returns in half of corpus.
The use of naked return values is frowned upon. This is the first of two bulk updates to remove them.
This commit is contained in:
parent
4e0c932a4f
commit
8f4c7ece92
|
@ -18,14 +18,12 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
type ProtocolBuffer struct {
|
||||
type PBEncoder struct {
|
||||
message proto.Message
|
||||
}
|
||||
|
||||
func (p ProtocolBuffer) MustEncode() []byte {
|
||||
func (p PBEncoder) MustEncode() []byte {
|
||||
raw, err := proto.Marshal(p.message)
|
||||
|
||||
// XXX: Adjust legacy users of this to not check for error.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -33,12 +31,13 @@ func (p ProtocolBuffer) MustEncode() []byte {
|
|||
return raw
|
||||
}
|
||||
|
||||
func (p ProtocolBuffer) String() string {
|
||||
return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message)
|
||||
func (p PBEncoder) String() string {
|
||||
return fmt.Sprintf("PBEncoder of %T", p.message)
|
||||
}
|
||||
|
||||
func NewProtocolBuffer(message proto.Message) *ProtocolBuffer {
|
||||
return &ProtocolBuffer{
|
||||
message: message,
|
||||
}
|
||||
// BUG(matt): Replace almost all calls to this with mechanisms that wrap the
|
||||
// underlying protocol buffers with business logic types that simply encode
|
||||
// themselves. If all points are done, then we'll no longer need this type.
|
||||
func NewPBEncoder(m proto.Message) PBEncoder {
|
||||
return PBEncoder{message: m}
|
||||
}
|
|
@ -83,14 +83,13 @@ func (c Config) Validate() error {
|
|||
}
|
||||
|
||||
// GetJobByName finds a job by its name in a Config object.
|
||||
func (c Config) GetJobByName(name string) (jobConfig *JobConfig) {
|
||||
func (c Config) GetJobByName(name string) *JobConfig {
|
||||
for _, job := range c.Job {
|
||||
if job.GetName() == name {
|
||||
jobConfig = &JobConfig{*job}
|
||||
break
|
||||
return &JobConfig{*job}
|
||||
}
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Jobs returns all the jobs in a Config object.
|
||||
|
|
|
@ -19,10 +19,10 @@ import (
|
|||
"io/ioutil"
|
||||
)
|
||||
|
||||
func LoadFromString(configStr string) (config Config, err error) {
|
||||
func LoadFromString(configStr string) (Config, error) {
|
||||
configProto := pb.PrometheusConfig{}
|
||||
if err = proto.UnmarshalText(configStr, &configProto); err != nil {
|
||||
return
|
||||
if err := proto.UnmarshalText(configStr, &configProto); err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
if configProto.Global == nil {
|
||||
configProto.Global = &pb.GlobalConfig{}
|
||||
|
@ -32,17 +32,18 @@ func LoadFromString(configStr string) (config Config, err error) {
|
|||
job.ScrapeInterval = proto.String(configProto.Global.GetScrapeInterval())
|
||||
}
|
||||
}
|
||||
config = Config{configProto}
|
||||
err = config.Validate()
|
||||
return
|
||||
|
||||
config := Config{configProto}
|
||||
err := config.Validate()
|
||||
|
||||
return config, err
|
||||
}
|
||||
|
||||
func LoadFromFile(fileName string) (config Config, err error) {
|
||||
func LoadFromFile(fileName string) (Config, error) {
|
||||
configStr, err := ioutil.ReadFile(fileName)
|
||||
if err != nil {
|
||||
return
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
config, err = LoadFromString(string(configStr))
|
||||
return
|
||||
return LoadFromString(string(configStr))
|
||||
}
|
||||
|
|
|
@ -67,16 +67,16 @@ type CurationKey struct {
|
|||
}
|
||||
|
||||
// Equal answers whether the two CurationKeys are equivalent.
|
||||
func (c CurationKey) Equal(o CurationKey) (equal bool) {
|
||||
func (c CurationKey) Equal(o CurationKey) bool {
|
||||
switch {
|
||||
case !c.Fingerprint.Equal(o.Fingerprint):
|
||||
return
|
||||
return false
|
||||
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
|
||||
return
|
||||
return false
|
||||
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
|
||||
return
|
||||
return false
|
||||
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
|
|
|
@ -45,7 +45,7 @@ type Fingerprint interface {
|
|||
}
|
||||
|
||||
// Builds a Fingerprint from a row key.
|
||||
func NewFingerprintFromRowKey(rowKey string) (f Fingerprint) {
|
||||
func NewFingerprintFromRowKey(rowKey string) Fingerprint {
|
||||
components := strings.Split(rowKey, rowKeyDelimiter)
|
||||
hash, err := strconv.ParseUint(components[0], 10, 64)
|
||||
if err != nil {
|
||||
|
@ -70,7 +70,7 @@ func NewFingerprintFromDTO(f *dto.Fingerprint) Fingerprint {
|
|||
}
|
||||
|
||||
// Decomposes a Metric into a Fingerprint.
|
||||
func NewFingerprintFromMetric(metric Metric) (f Fingerprint) {
|
||||
func NewFingerprintFromMetric(metric Metric) Fingerprint {
|
||||
labelLength := len(metric)
|
||||
labelNames := make([]string, 0, labelLength)
|
||||
|
||||
|
@ -184,7 +184,7 @@ func (f Fingerprints) Len() int {
|
|||
return len(f)
|
||||
}
|
||||
|
||||
func (f Fingerprints) Less(i, j int) (less bool) {
|
||||
func (f Fingerprints) Less(i, j int) bool {
|
||||
return f[i].Less(f[j])
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ func (l LabelPairs) Len() int {
|
|||
return len(l)
|
||||
}
|
||||
|
||||
func (l LabelPairs) Less(i, j int) (less bool) {
|
||||
func (l LabelPairs) Less(i, j int) bool {
|
||||
if l[i].Name < l[j].Name {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -82,12 +82,12 @@ func (l LabelSet) String() string {
|
|||
return buffer.String()
|
||||
}
|
||||
|
||||
func (l LabelSet) ToMetric() (metric Metric) {
|
||||
metric = Metric{}
|
||||
func (l LabelSet) ToMetric() Metric {
|
||||
metric := Metric{}
|
||||
for label, value := range l {
|
||||
metric[label] = value
|
||||
}
|
||||
return
|
||||
return metric
|
||||
}
|
||||
|
||||
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
|
||||
|
@ -168,20 +168,17 @@ func (v Values) LastTimeBefore(t time.Time) bool {
|
|||
|
||||
// InsideInterval indicates whether a given range of sorted values could contain
|
||||
// a value for a given time.
|
||||
func (v Values) InsideInterval(t time.Time) (s bool) {
|
||||
if v.Len() == 0 {
|
||||
return
|
||||
func (v Values) InsideInterval(t time.Time) bool {
|
||||
switch {
|
||||
case v.Len() == 0:
|
||||
return false
|
||||
case t.Before(v[0].Timestamp):
|
||||
return false
|
||||
case !v[v.Len()-1].Timestamp.Before(t):
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
if t.Before(v[0].Timestamp) {
|
||||
return
|
||||
}
|
||||
|
||||
if !v[v.Len()-1].Timestamp.Before(t) {
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// TruncateBefore returns a subslice of the original such that extraneous
|
||||
|
@ -239,7 +236,7 @@ func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
|
|||
})
|
||||
}
|
||||
|
||||
return
|
||||
return v
|
||||
}
|
||||
|
||||
type SampleSet struct {
|
||||
|
|
|
@ -43,16 +43,15 @@ func (s Samples) Len() int {
|
|||
return len(s)
|
||||
}
|
||||
|
||||
func (s Samples) Less(i, j int) (less bool) {
|
||||
if NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)) {
|
||||
func (s Samples) Less(i, j int) bool {
|
||||
switch {
|
||||
case NewFingerprintFromMetric(s[i].Metric).Less(NewFingerprintFromMetric(s[j].Metric)):
|
||||
return true
|
||||
}
|
||||
|
||||
if s[i].Timestamp.Before(s[j].Timestamp) {
|
||||
case s[i].Timestamp.Before(s[j].Timestamp):
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (s Samples) Swap(i, j int) {
|
||||
|
|
|
@ -33,15 +33,15 @@ type SampleKey struct {
|
|||
// MayContain indicates whether the given SampleKey could potentially contain a
|
||||
// value at the provided time. Even if true is emitted, that does not mean a
|
||||
// satisfactory value, in fact, exists.
|
||||
func (s SampleKey) MayContain(t time.Time) (could bool) {
|
||||
func (s SampleKey) MayContain(t time.Time) bool {
|
||||
switch {
|
||||
case t.Before(s.FirstTimestamp):
|
||||
return
|
||||
return false
|
||||
case t.After(s.LastTimestamp):
|
||||
return
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
|
||||
|
|
|
@ -34,22 +34,19 @@ type Registry interface {
|
|||
type registry struct {
|
||||
}
|
||||
|
||||
func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Processor, err error) {
|
||||
func (r *registry) ProcessorForRequestHeader(header http.Header) (Processor, error) {
|
||||
if header == nil {
|
||||
err = fmt.Errorf("Received illegal and nil header.")
|
||||
return
|
||||
return nil, fmt.Errorf("Received illegal and nil header.")
|
||||
}
|
||||
|
||||
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
|
||||
return
|
||||
return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
|
||||
}
|
||||
|
||||
if mediatype != "application/json" {
|
||||
err = fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
|
||||
return
|
||||
return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
|
||||
}
|
||||
|
||||
var prometheusApiVersion string
|
||||
|
@ -62,13 +59,10 @@ func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Proc
|
|||
|
||||
switch prometheusApiVersion {
|
||||
case "0.0.2":
|
||||
processor = Processor002
|
||||
return
|
||||
return Processor002, nil
|
||||
case "0.0.1":
|
||||
processor = Processor001
|
||||
return
|
||||
return Processor001, nil
|
||||
default:
|
||||
err = fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
|
||||
return
|
||||
return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,20 +58,19 @@ type entity001 []struct {
|
|||
} `json:"metric"`
|
||||
}
|
||||
|
||||
func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
|
||||
func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
|
||||
// TODO(matt): Replace with plain-jane JSON unmarshalling.
|
||||
defer stream.Close()
|
||||
|
||||
buffer, err := ioutil.ReadAll(stream)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
entities := entity001{}
|
||||
|
||||
err = json.Unmarshal(buffer, &entities)
|
||||
if err != nil {
|
||||
return
|
||||
if err = json.Unmarshal(buffer, &entities); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(matt): This outer loop is a great basis for parallelization.
|
||||
|
@ -140,7 +139,6 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa
|
|||
}
|
||||
|
||||
break
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -148,5 +146,5 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa
|
|||
results <- Result{Samples: pendingSamples}
|
||||
}
|
||||
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
|
|||
t.scheduler.Reschedule(earliest, futureState)
|
||||
t.state = futureState
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) {
|
||||
|
@ -194,13 +194,13 @@ func (t *target) scrape(timestamp time.Time, results chan format.Result) (err er
|
|||
|
||||
resp, err := t.client.Get(t.Address())
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// XXX: This is a wart; we need to handle this more gracefully down the
|
||||
|
|
|
@ -53,7 +53,7 @@ func (m *targetManager) release() {
|
|||
<-m.requestAllowance
|
||||
}
|
||||
|
||||
func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) {
|
||||
func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInterval time.Duration) *TargetPool {
|
||||
targetPool, ok := m.poolsByJob[job.GetName()]
|
||||
|
||||
if !ok {
|
||||
|
@ -62,9 +62,11 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig, defaultScrapeInte
|
|||
|
||||
interval := job.ScrapeInterval()
|
||||
m.poolsByJob[job.GetName()] = targetPool
|
||||
// BUG(all): Investigate whether this auto-goroutine creation is desired.
|
||||
go targetPool.Run(m.results, interval)
|
||||
}
|
||||
return
|
||||
|
||||
return targetPool
|
||||
}
|
||||
|
||||
func (m *targetManager) AddTarget(job config.JobConfig, t Target, defaultScrapeInterval time.Duration) {
|
||||
|
|
|
@ -32,7 +32,7 @@ type TargetPool struct {
|
|||
replaceTargetsQueue chan []Target
|
||||
}
|
||||
|
||||
func NewTargetPool(m TargetManager) (p *TargetPool) {
|
||||
func NewTargetPool(m TargetManager) *TargetPool {
|
||||
return &TargetPool{
|
||||
manager: m,
|
||||
addTargetQueue: make(chan Target),
|
||||
|
@ -151,7 +151,7 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
|
|||
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
|
||||
}
|
||||
|
||||
// XXX: Not really thread-safe. Only used in /status page for now.
|
||||
// BUG(all): Not really thread-safe. Only used in /status page for now.
|
||||
func (p *TargetPool) Targets() []Target {
|
||||
return p.targets
|
||||
}
|
||||
|
|
|
@ -25,14 +25,15 @@ import (
|
|||
// States that active alerts can be in.
|
||||
type alertState int
|
||||
|
||||
func (s alertState) String() (state string) {
|
||||
func (s alertState) String() string {
|
||||
switch s {
|
||||
case PENDING:
|
||||
state = "pending"
|
||||
return "pending"
|
||||
case FIRING:
|
||||
state = "firing"
|
||||
return "firing"
|
||||
default:
|
||||
panic("undefined")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -88,15 +89,15 @@ type AlertingRule struct {
|
|||
|
||||
func (rule AlertingRule) Name() string { return rule.name }
|
||||
|
||||
func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
|
||||
func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
return ast.EvalVectorInstant(rule.vector, timestamp, storage)
|
||||
}
|
||||
|
||||
func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
|
||||
func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
// Get the raw value of the rule expression.
|
||||
exprResult, err := rule.EvalRaw(timestamp, storage)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create pending alerts for any new vector elements in the alert expression.
|
||||
|
@ -115,6 +116,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage
|
|||
}
|
||||
}
|
||||
|
||||
vector := ast.Vector{}
|
||||
|
||||
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
|
||||
for fp, activeAlert := range rule.activeAlerts {
|
||||
if !resultFingerprints.Has(fp) {
|
||||
|
@ -130,7 +133,8 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage
|
|||
|
||||
vector = append(vector, activeAlert.sample(timestamp, 1))
|
||||
}
|
||||
return
|
||||
|
||||
return vector, nil
|
||||
}
|
||||
|
||||
func (rule AlertingRule) ToDotGraph() string {
|
||||
|
|
|
@ -269,14 +269,14 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.Tie
|
|||
return
|
||||
}
|
||||
|
||||
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (matrix Matrix, err error) {
|
||||
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (Matrix, error) {
|
||||
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
|
||||
// null in JSON.
|
||||
matrix = Matrix{}
|
||||
matrix := Matrix{}
|
||||
|
||||
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
// TODO implement watchdog timer for long-running queries.
|
||||
sampleSets := map[string]*model.SampleSet{}
|
||||
|
@ -302,7 +302,8 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
|
|||
for _, sampleSet := range sampleSets {
|
||||
matrix = append(matrix, *sampleSet)
|
||||
}
|
||||
return
|
||||
|
||||
return matrix, nil
|
||||
}
|
||||
|
||||
func labelIntersection(metric1, metric2 model.Metric) model.Metric {
|
||||
|
|
|
@ -146,7 +146,7 @@ func (sorter vectorByValueSorter) Len() int {
|
|||
return len(sorter.vector)
|
||||
}
|
||||
|
||||
func (sorter vectorByValueSorter) Less(i, j int) (less bool) {
|
||||
func (sorter vectorByValueSorter) Less(i, j int) bool {
|
||||
return sorter.vector[i].Value < sorter.vector[j].Value
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m
|
|||
// surrounding a given target time. If samples are found both before and after
|
||||
// the target time, the sample value is interpolated between these. Otherwise,
|
||||
// the single closest sample is returned verbatim.
|
||||
func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) (sample *model.SamplePair) {
|
||||
func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) *model.SamplePair {
|
||||
var closestBefore *model.SamplePair
|
||||
var closestAfter *model.SamplePair
|
||||
for _, candidate := range samples {
|
||||
|
@ -96,14 +96,12 @@ func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.T
|
|||
|
||||
switch {
|
||||
case closestBefore != nil && closestAfter != nil:
|
||||
sample = interpolateSamples(closestBefore, closestAfter, timestamp)
|
||||
return interpolateSamples(closestBefore, closestAfter, timestamp)
|
||||
case closestBefore != nil:
|
||||
sample = closestBefore
|
||||
return closestBefore
|
||||
default:
|
||||
sample = closestAfter
|
||||
return closestAfter
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp time.Time) (samples Vector, err error) {
|
||||
|
@ -122,7 +120,7 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp
|
|||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
return samples, err
|
||||
}
|
||||
|
||||
func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interval *model.Interval) (sampleSets []model.SampleSet, err error) {
|
||||
|
|
|
@ -95,7 +95,7 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
|
|||
}
|
||||
}
|
||||
|
||||
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
|
||||
func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (*viewAdapter, error) {
|
||||
analyzer := NewQueryAnalyzer(storage)
|
||||
analyzer.AnalyzeQueries(node)
|
||||
viewBuilder := metric.NewViewRequestBuilder()
|
||||
|
@ -106,13 +106,13 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.
|
|||
viewBuilder.GetMetricAtTime(fingerprint, timestamp)
|
||||
}
|
||||
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
|
||||
if err == nil {
|
||||
viewAdapter = NewViewAdapter(view, storage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
return NewViewAdapter(view, storage), nil
|
||||
}
|
||||
|
||||
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
|
||||
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (*viewAdapter, error) {
|
||||
analyzer := NewQueryAnalyzer(storage)
|
||||
analyzer.AnalyzeQueries(node)
|
||||
viewBuilder := metric.NewViewRequestBuilder()
|
||||
|
@ -126,8 +126,8 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
|
|||
viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval)
|
||||
}
|
||||
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
|
||||
if err == nil {
|
||||
viewAdapter = NewViewAdapter(view, storage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
return NewViewAdapter(view, storage), nil
|
||||
}
|
||||
|
|
|
@ -31,15 +31,15 @@ type RecordingRule struct {
|
|||
|
||||
func (rule RecordingRule) Name() string { return rule.name }
|
||||
|
||||
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
|
||||
func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
return ast.EvalVectorInstant(rule.vector, timestamp, storage)
|
||||
}
|
||||
|
||||
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
|
||||
func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error) {
|
||||
// Get the raw value of the rule expression.
|
||||
vector, err = rule.EvalRaw(timestamp, storage)
|
||||
vector, err := rule.EvalRaw(timestamp, storage)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Override the metric name and labels.
|
||||
|
@ -53,7 +53,8 @@ func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorag
|
|||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
return vector, nil
|
||||
}
|
||||
|
||||
func (rule RecordingRule) ToDotGraph() string {
|
||||
|
|
|
@ -26,9 +26,9 @@ type Rule interface {
|
|||
Name() string
|
||||
// EvalRaw evaluates the rule's vector expression without triggering any
|
||||
// other actions, like recording or alerting.
|
||||
EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
|
||||
EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
// Eval evaluates the rule, including any associated recording or alerting actions.
|
||||
Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
|
||||
Eval(timestamp time.Time, storage *metric.TieredStorage) (ast.Vector, error)
|
||||
// ToDotGraph returns a Graphviz dot graph of the rule.
|
||||
ToDotGraph() string
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge
|
|||
}.ToDTO()
|
||||
curationValue := &dto.CurationValue{}
|
||||
|
||||
rawKey := coding.NewProtocolBuffer(curationKey)
|
||||
rawKey := coding.NewPBEncoder(curationKey)
|
||||
|
||||
has, err := states.Has(rawKey)
|
||||
if err != nil {
|
||||
|
@ -341,7 +341,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|||
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
|
||||
}
|
||||
|
||||
prospectiveKey := coding.NewProtocolBuffer(startKey.ToDTO()).MustEncode()
|
||||
prospectiveKey := coding.NewPBEncoder(startKey.ToDTO()).MustEncode()
|
||||
if !w.sampleIterator.Seek(prospectiveKey) {
|
||||
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
||||
// no work may occur, and the iterator cannot be recovered.
|
||||
|
@ -386,7 +386,7 @@ func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished t
|
|||
LastCompletionTimestamp: finished,
|
||||
}.ToDTO()
|
||||
|
||||
err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue))
|
||||
err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue))
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
|
|||
Timestamp: upperSeek,
|
||||
}
|
||||
|
||||
raw := coding.NewProtocolBuffer(key).MustEncode()
|
||||
raw := coding.NewPBEncoder(key).MustEncode()
|
||||
i.Seek(raw)
|
||||
|
||||
if i.Key() == nil {
|
||||
|
@ -157,7 +157,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
|
|||
|
||||
key.Timestamp = lowerSeek
|
||||
|
||||
raw = coding.NewProtocolBuffer(key).MustEncode()
|
||||
raw = coding.NewPBEncoder(key).MustEncode()
|
||||
|
||||
i.Seek(raw)
|
||||
|
||||
|
|
|
@ -305,7 +305,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
|||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
|
||||
}
|
||||
|
||||
err = l.labelNameToFingerprints.Commit(batch)
|
||||
|
@ -377,7 +377,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
|||
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||
}
|
||||
|
||||
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
|
||||
}
|
||||
|
||||
err = l.labelSetToFingerprints.Commit(batch)
|
||||
|
@ -403,8 +403,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
|
|||
defer batch.Close()
|
||||
|
||||
for fingerprint, metric := range metrics {
|
||||
key := coding.NewProtocolBuffer(fingerprint.ToDTO())
|
||||
value := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||
key := coding.NewPBEncoder(fingerprint.ToDTO())
|
||||
value := coding.NewPBEncoder(model.MetricToDTO(metric))
|
||||
batch.Put(key, value)
|
||||
}
|
||||
|
||||
|
@ -469,7 +469,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
|
||||
// WART: We should probably encode simple fingerprints.
|
||||
for _, metric := range absentMetrics {
|
||||
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||
key := coding.NewPBEncoder(model.MetricToDTO(metric))
|
||||
batch.Put(key, key)
|
||||
}
|
||||
|
||||
|
@ -498,7 +498,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|||
value := &dto.MetricHighWatermark{}
|
||||
raw := []byte{}
|
||||
newestSampleTimestamp := samples[len(samples)-1].Timestamp
|
||||
keyEncoded := coding.NewProtocolBuffer(key)
|
||||
keyEncoded := coding.NewPBEncoder(key)
|
||||
|
||||
key.Signature = proto.String(fingerprint.ToRowKey())
|
||||
raw, err = l.MetricHighWatermarks.Get(keyEncoded)
|
||||
|
@ -517,7 +517,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|||
}
|
||||
}
|
||||
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
||||
batch.Put(keyEncoded, coding.NewProtocolBuffer(value))
|
||||
batch.Put(keyEncoded, coding.NewPBEncoder(value))
|
||||
mutationCount++
|
||||
}
|
||||
|
||||
|
@ -592,7 +592,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
})
|
||||
}
|
||||
|
||||
samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||
samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -665,7 +665,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
|
|||
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
dtoKey := coding.NewPBEncoder(dto)
|
||||
value, err = l.metricMembershipIndex.Has(dtoKey)
|
||||
|
||||
return
|
||||
|
@ -678,7 +678,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
|
|||
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
dtoKey := coding.NewPBEncoder(dto)
|
||||
value, err = l.labelSetToFingerprints.Has(dtoKey)
|
||||
|
||||
return
|
||||
|
@ -691,7 +691,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
|
|||
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
dtoKey := coding.NewPBEncoder(dto)
|
||||
value, err = l.labelNameToFingerprints.Has(dtoKey)
|
||||
|
||||
return
|
||||
|
@ -707,7 +707,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
|
|||
sets := []utility.Set{}
|
||||
|
||||
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
|
||||
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO))
|
||||
f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO))
|
||||
if err != nil {
|
||||
return fps, err
|
||||
}
|
||||
|
@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
|
|||
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
|
||||
}(time.Now())
|
||||
|
||||
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
|
||||
raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName)))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -779,7 +779,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
|
|||
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
||||
}(time.Now())
|
||||
|
||||
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
|
||||
raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f)))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
|
|||
|
||||
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
key := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
|
@ -165,14 +165,14 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
|
|||
// If the number of pending writes equals the target group size
|
||||
case len(pendingSamples) == p.MinimumGroupSize:
|
||||
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
|
||||
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
|
||||
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
|
||||
key := coding.NewPBEncoder(newSampleKey.ToDTO())
|
||||
value := coding.NewPBEncoder(pendingSamples.ToDTO())
|
||||
pendingBatch.Put(key, value)
|
||||
pendingMutations++
|
||||
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
|
||||
if len(sampleValues) > 0 {
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
key := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
|
|||
|
||||
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
key := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
|
@ -211,8 +211,8 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
|
|||
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
|
||||
pendingSamples = append(sampleValues, pendingSamples...)
|
||||
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
|
||||
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
|
||||
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
|
||||
key := coding.NewPBEncoder(newSampleKey.ToDTO())
|
||||
value := coding.NewPBEncoder(pendingSamples.ToDTO())
|
||||
pendingBatch.Put(key, value)
|
||||
pendingSamples = model.Values{}
|
||||
pendingMutations++
|
||||
|
@ -320,14 +320,14 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
|
|||
pendingBatch = nil
|
||||
|
||||
case !sampleKey.MayContain(stopAt):
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
key := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
lastCurated = sampleKey.LastTimestamp
|
||||
sampleValues = model.Values{}
|
||||
pendingMutations++
|
||||
|
||||
case sampleKey.MayContain(stopAt):
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
key := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
pendingMutations++
|
||||
|
||||
|
@ -335,8 +335,8 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
|
|||
if len(sampleValues) > 0 {
|
||||
sampleKey = sampleValues.ToSampleKey(fingerprint)
|
||||
lastCurated = sampleKey.FirstTimestamp
|
||||
newKey := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
newValue := coding.NewProtocolBuffer(sampleValues.ToDTO())
|
||||
newKey := coding.NewPBEncoder(sampleKey.ToDTO())
|
||||
newValue := coding.NewPBEncoder(sampleValues.ToDTO())
|
||||
pendingBatch.Put(newKey, newValue)
|
||||
pendingMutations++
|
||||
} else {
|
||||
|
|
|
@ -61,14 +61,14 @@ func (c curationState) Get() (key, value coding.Encoder) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
key = coding.NewProtocolBuffer(model.CurationKey{
|
||||
key = coding.NewPBEncoder(model.CurationKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
|
||||
ProcessorMessageRaw: signature,
|
||||
ProcessorMessageTypeName: c.processor.Name(),
|
||||
IgnoreYoungerThan: c.ignoreYoungerThan,
|
||||
}.ToDTO())
|
||||
|
||||
value = coding.NewProtocolBuffer(model.CurationRemark{
|
||||
value = coding.NewPBEncoder(model.CurationRemark{
|
||||
LastCompletionTimestamp: c.lastCurated,
|
||||
}.ToDTO())
|
||||
|
||||
|
@ -76,20 +76,20 @@ func (c curationState) Get() (key, value coding.Encoder) {
|
|||
}
|
||||
|
||||
func (w watermarkState) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
||||
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
||||
key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
||||
value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
||||
return
|
||||
}
|
||||
|
||||
func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(model.SampleKey{
|
||||
key = coding.NewPBEncoder(model.SampleKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
|
||||
FirstTimestamp: s.values[0].Timestamp,
|
||||
LastTimestamp: s.values[len(s.values)-1].Timestamp,
|
||||
SampleCount: uint32(len(s.values)),
|
||||
}.ToDTO())
|
||||
|
||||
value = coding.NewProtocolBuffer(s.values.ToDTO())
|
||||
value = coding.NewPBEncoder(s.values.ToDTO())
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -194,7 +194,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
|
|||
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
||||
}
|
||||
|
||||
e := coding.NewProtocolBuffer(k).MustEncode()
|
||||
e := coding.NewPBEncoder(k).MustEncode()
|
||||
|
||||
iterator := l.MetricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
|
|
@ -488,7 +488,7 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
|
|||
}
|
||||
|
||||
// Try seeking to target key.
|
||||
rawKey := coding.NewProtocolBuffer(targetKey).MustEncode()
|
||||
rawKey := coding.NewPBEncoder(targetKey).MustEncode()
|
||||
iterator.Seek(rawKey)
|
||||
|
||||
foundKey, err := extractSampleKey(iterator)
|
||||
|
|
|
@ -15,13 +15,18 @@ package leveldb
|
|||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
)
|
||||
|
||||
type indexValue struct{}
|
||||
|
||||
func (i *indexValue) MustEncode() []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
var (
|
||||
existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{})
|
||||
existenceValue = &indexValue{}
|
||||
)
|
||||
|
||||
type LevelDBMembershipIndex struct {
|
||||
|
|
Loading…
Reference in a new issue