From 5817cb5bde7be713ef486e7eadaeb171e4e8eae9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 25 Dec 2016 00:37:46 +0100 Subject: [PATCH] *: migrate from model.* to promql.* types --- notifier/notifier.go | 93 +++++++++++++++----- pkg/labels/labels.go | 11 ++- rules/alerting.go | 106 +++++++++++------------ rules/manager.go | 17 +++- rules/recording.go | 49 +++++------ template/template.go | 32 ++++--- web/api/v1/api.go | 103 ++++++++++++----------- web/federate.go | 196 ++++++++++++++++++++++--------------------- 8 files changed, 343 insertions(+), 264 deletions(-) diff --git a/notifier/notifier.go b/notifier/notifier.go index 5cee01eac..482014b5c 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/retrieval" ) @@ -50,10 +51,57 @@ const ( alertmanagerLabel = "alertmanager" ) +// Alert is a generic representation of an alert in the Prometheus eco-system. +type Alert struct { + // Label value pairs for purpose of aggregation, matching, and disposition + // dispatching. This must minimally include an "alertname" label. + Labels labels.Labels `json:"labels"` + + // Extra key/value information which does not define alert identity. + Annotations labels.Labels `json:"annotations"` + + // The known time range for this alert. Both ends are optional. + StartsAt time.Time `json:"startsAt,omitempty"` + EndsAt time.Time `json:"endsAt,omitempty"` + GeneratorURL string `json:"generatorURL,omitempty"` +} + +// Name returns the name of the alert. It is equivalent to the "alertname" label. +func (a *Alert) Name() string { + return a.Labels.Get(labels.AlertName) +} + +// Hash returns a hash over the alert. It is equivalent to the alert labels hash. +func (a *Alert) Hash() uint64 { + return a.Labels.Hash() +} + +func (a *Alert) String() string { + s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7]) + if a.Resolved() { + return s + "[resolved]" + } + return s + "[active]" +} + +// Resolved returns true iff the activity interval ended in the past. +func (a *Alert) Resolved() bool { + return a.ResolvedAt(time.Now()) +} + +// ResolvedAt returns true off the activity interval ended before +// the given timestamp. +func (a *Alert) ResolvedAt(ts time.Time) bool { + if a.EndsAt.IsZero() { + return false + } + return !a.EndsAt.After(ts) +} + // Notifier is responsible for dispatching alert notifications to an // alert manager service. type Notifier struct { - queue model.Alerts + queue []*Alert opts *Options more chan struct{} @@ -84,7 +132,7 @@ func New(o *Options) *Notifier { ctx, cancel := context.WithCancel(context.Background()) return &Notifier{ - queue: make(model.Alerts, 0, o.QueueCapacity), + queue: make([]*Alert, 0, o.QueueCapacity), ctx: ctx, cancel: cancel, more: make(chan struct{}, 1), @@ -182,17 +230,17 @@ func (n *Notifier) queueLen() int { return len(n.queue) } -func (n *Notifier) nextBatch() []*model.Alert { +func (n *Notifier) nextBatch() []*Alert { n.mtx.Lock() defer n.mtx.Unlock() - var alerts model.Alerts + var alerts []*Alert if len(n.queue) > maxBatchSize { - alerts = append(make(model.Alerts, 0, maxBatchSize), n.queue[:maxBatchSize]...) + alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...) n.queue = n.queue[maxBatchSize:] } else { - alerts = append(make(model.Alerts, 0, len(n.queue)), n.queue...) + alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...) n.queue = n.queue[:0] } @@ -221,15 +269,17 @@ func (n *Notifier) Run() { // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. -func (n *Notifier) Send(alerts ...*model.Alert) { +func (n *Notifier) Send(alerts ...*Alert) { n.mtx.Lock() defer n.mtx.Unlock() // Attach external labels before relabelling and sending. for _, a := range alerts { + lb := labels.NewBuilder(a.Labels) + for ln, lv := range n.opts.ExternalLabels { - if _, ok := a.Labels[ln]; !ok { - a.Labels[ln] = lv + if a.Labels.Get(string(ln)) == "" { + lb.Set(string(ln), string(lv)) } } } @@ -259,16 +309,19 @@ func (n *Notifier) Send(alerts ...*model.Alert) { n.setMore() } -func (n *Notifier) relabelAlerts(alerts []*model.Alert) []*model.Alert { - var relabeledAlerts []*model.Alert - for _, alert := range alerts { - labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...) - if labels != nil { - alert.Labels = labels - relabeledAlerts = append(relabeledAlerts, alert) - } - } - return relabeledAlerts +func (n *Notifier) relabelAlerts(alerts []*Alert) []*Alert { + // TODO(fabxc): temporarily disabled. + return alerts + + // var relabeledAlerts []*Alert + // for _, alert := range alerts { + // labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...) + // if labels != nil { + // alert.Labels = labels + // relabeledAlerts = append(relabeledAlerts, alert) + // } + // } + // return relabeledAlerts } // setMore signals that the alert queue has items. @@ -302,7 +355,7 @@ func (n *Notifier) Alertmanagers() []string { // sendAll sends the alerts to all configured Alertmanagers concurrently. // It returns true if the alerts could be sent successfully to at least one Alertmanager. -func (n *Notifier) sendAll(alerts ...*model.Alert) bool { +func (n *Notifier) sendAll(alerts ...*Alert) bool { begin := time.Now() b, err := json.Marshal(alerts) diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index f29fb290a..5795fb3dc 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -61,6 +61,13 @@ func (ls Labels) Hash() uint64 { return xxhash.Sum64(b) } +// Copy returns a copy of the labels. +func (ls Labels) Copy() Labels { + res := make(Labels, len(ls)) + copy(res, ls) + return res +} + // Get returns the value for the label with the given name. // Returns an empty string if the label doesn't exist. func (ls Labels) Get(name string) string { @@ -72,8 +79,8 @@ func (ls Labels) Get(name string) string { return "" } -// Equals returns whether the two label sets are equal. -func (ls Labels) Equals(o Labels) bool { +// Equal returns whether the two label sets are equal. +func Equal(ls, o Labels) bool { if len(ls) != len(o) { return false } diff --git a/rules/alerting.go b/rules/alerting.go index 6194c4afa..14a3f5a19 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/util/strutil" @@ -32,12 +33,12 @@ import ( const ( // AlertMetricName is the metric name for synthetic alert timeseries. - alertMetricName model.LabelValue = "ALERTS" + alertMetricName = "ALERTS" // AlertNameLabel is the label name indicating the name of an alert. - alertNameLabel model.LabelName = "alertname" + alertNameLabel = "alertname" // AlertStateLabel is the label name indicating the state of an alert. - alertStateLabel model.LabelName = "alertstate" + alertStateLabel = "alertstate" ) // AlertState denotes the state of an active alert. @@ -68,11 +69,13 @@ func (s AlertState) String() string { // Alert is the user-level representation of a single instance of an alerting rule. type Alert struct { - State AlertState - Labels model.LabelSet - Annotations model.LabelSet + State AlertState + + Labels labels.Labels + Annotations labels.Labels + // The value at the last evaluation of the alerting expression. - Value model.SampleValue + Value float64 // The interval during which the condition of this alert held true. // ResolvedAt will be 0 to indicate a still active alert. ActiveAt, ResolvedAt model.Time @@ -88,26 +91,26 @@ type AlertingRule struct { // output vector before an alert transitions from Pending to Firing state. holdDuration time.Duration // Extra labels to attach to the resulting alert sample vectors. - labels model.LabelSet + labels labels.Labels // Non-identifying key/value pairs. - annotations model.LabelSet + annotations labels.Labels // Protects the below. mtx sync.Mutex // A map of alerts which are currently active (Pending or Firing), keyed by // the fingerprint of the labelset they correspond to. - active map[model.Fingerprint]*Alert + active map[uint64]*Alert } // NewAlertingRule constructs a new AlertingRule. -func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns model.LabelSet) *AlertingRule { +func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels) *AlertingRule { return &AlertingRule{ name: name, vector: vec, holdDuration: hold, labels: lbls, annotations: anns, - active: map[model.Fingerprint]*Alert{}, + active: map[uint64]*Alert{}, } } @@ -117,27 +120,26 @@ func (r *AlertingRule) Name() string { } func (r *AlertingRule) equal(o *AlertingRule) bool { - return r.name == o.name && r.labels.Equal(o.labels) + return r.name == o.name && labels.Equal(r.labels, o.labels) } -func (r *AlertingRule) sample(alert *Alert, ts model.Time, set bool) *model.Sample { - metric := model.Metric(r.labels.Clone()) +func (r *AlertingRule) sample(alert *Alert, ts model.Time, set bool) promql.Sample { + lb := labels.NewBuilder(r.labels) - for ln, lv := range alert.Labels { - metric[ln] = lv + for _, l := range alert.Labels { + lb.Set(l.Name, l.Value) } - metric[model.MetricNameLabel] = alertMetricName - metric[model.AlertNameLabel] = model.LabelValue(r.name) - metric[alertStateLabel] = model.LabelValue(alert.State.String()) + lb.Set(labels.MetricName, alertMetricName) + lb.Set(labels.AlertName, r.name) + lb.Set(alertStateLabel, alert.State.String()) - s := &model.Sample{ - Metric: metric, - Timestamp: ts, - Value: 0, + s := promql.Sample{ + Metric: lb.Labels(), + Point: promql.Point{T: int64(ts), V: 0}, } if set { - s.Value = 1 + s.V = 1 } return s } @@ -148,8 +150,8 @@ const resolvedRetention = 15 * time.Minute // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { - query, err := engine.NewInstantQuery(r.vector.String(), ts) +func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (promql.Vector, error) { + query, err := engine.NewInstantQuery(r.vector.String(), ts.Time()) if err != nil { return nil, err } @@ -163,13 +165,13 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E // Create pending alerts for any new vector elements in the alert expression // or update the expression value for existing elements. - resultFPs := map[model.Fingerprint]struct{}{} + resultFPs := map[uint64]struct{}{} for _, smpl := range res { // Provide the alert information to the template. l := make(map[string]string, len(smpl.Metric)) - for k, v := range smpl.Metric { - l[string(k)] = string(v) + for _, lbl := range smpl.Metric { + l[lbl.Name] = lbl.Value } tmplData := struct { @@ -177,13 +179,13 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E Value float64 }{ Labels: l, - Value: float64(smpl.Value), + Value: smpl.V, } // Inject some convenience variables that are easier to remember for users // who are not used to Go's templating system. defs := "{{$labels := .Labels}}{{$value := .Value}}" - expand := func(text model.LabelValue) model.LabelValue { + expand := func(text string) string { tmpl := template.NewTemplateExpander( ctx, defs+string(text), @@ -198,44 +200,42 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E result = fmt.Sprintf("", err) log.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err) } - return model.LabelValue(result) + return result } - delete(smpl.Metric, model.MetricNameLabel) - labels := make(model.LabelSet, len(smpl.Metric)+len(r.labels)+1) - for ln, lv := range smpl.Metric { - labels[ln] = lv - } - for ln, lv := range r.labels { - labels[ln] = expand(lv) - } - labels[model.AlertNameLabel] = model.LabelValue(r.Name()) + lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName) - annotations := make(model.LabelSet, len(r.annotations)) - for an, av := range r.annotations { - annotations[an] = expand(av) + for _, l := range r.labels { + lb.Set(l.Name, expand(l.Value)) } - fp := smpl.Metric.Fingerprint() - resultFPs[fp] = struct{}{} + lb.Set(labels.AlertName, r.Name()) + + annotations := make(labels.Labels, 0, len(r.annotations)) + for _, a := range r.annotations { + annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)}) + } + + h := smpl.Metric.Hash() + resultFPs[h] = struct{}{} // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. - if alert, ok := r.active[fp]; ok && alert.State != StateInactive { - alert.Value = smpl.Value + if alert, ok := r.active[h]; ok && alert.State != StateInactive { + alert.Value = smpl.V alert.Annotations = annotations continue } - r.active[fp] = &Alert{ - Labels: labels, + r.active[h] = &Alert{ + Labels: lb.Labels(), Annotations: annotations, ActiveAt: ts, State: StatePending, - Value: smpl.Value, + Value: smpl.V, } } - var vec model.Vector + var vec promql.Vector // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { if _, ok := resultFPs[fp]; !ok { diff --git a/rules/manager.go b/rules/manager.go index ca5d4185a..9d211bd8c 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -20,6 +20,7 @@ import ( "path/filepath" "sync" "time" + "unsafe" html_template "html/template" @@ -106,7 +107,7 @@ const ( type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - Eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error) + Eval(context.Context, model.Time, *promql.Engine, string) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string // HTMLSnippet returns a human-readable string representation of the rule, @@ -278,8 +279,16 @@ func (g *Group) Eval() { numOutOfOrder = 0 numDuplicates = 0 ) + for _, s := range vector { - if err := g.opts.SampleAppender.Append(s); err != nil { + // TODO(fabxc): adjust after reworking appending. + var ms model.Sample + lbls := s.Metric.Map() + ms.Metric = *(*model.Metric)(unsafe.Pointer(&lbls)) + ms.Timestamp = model.Time(s.T) + ms.Value = model.SampleValue(s.V) + + if err := g.opts.SampleAppender.Append(&ms); err != nil { switch err { case local.ErrOutOfOrderSample: numOutOfOrder++ @@ -305,7 +314,7 @@ func (g *Group) Eval() { // sendAlerts sends alert notifications for the given rule. func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error { - var alerts model.Alerts + var alerts []*notifier.Alert for _, alert := range rule.currentAlerts() { // Only send actually firing alerts. @@ -313,7 +322,7 @@ func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error { continue } - a := &model.Alert{ + a := ¬ifier.Alert{ StartsAt: alert.ActiveAt.Add(rule.holdDuration).Time(), Labels: alert.Labels, Annotations: alert.Annotations, diff --git a/rules/recording.go b/rules/recording.go index a4b7a4b8d..2a3124875 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" ) @@ -28,15 +29,15 @@ import ( type RecordingRule struct { name string vector promql.Expr - labels model.LabelSet + labels labels.Labels } // NewRecordingRule returns a new recording rule. -func NewRecordingRule(name string, vector promql.Expr, labels model.LabelSet) *RecordingRule { +func NewRecordingRule(name string, vector promql.Expr, lset labels.Labels) *RecordingRule { return &RecordingRule{ name: name, vector: vector, - labels: labels, + labels: lset, } } @@ -46,35 +47,27 @@ func (rule RecordingRule) Name() string { } // Eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { - query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) +func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (promql.Vector, error) { + query, err := engine.NewInstantQuery(rule.vector.String(), timestamp.Time()) if err != nil { return nil, err } var ( result = query.Exec(ctx) - vector model.Vector + vector promql.Vector ) if result.Err != nil { return nil, err } - switch result.Value.(type) { - case model.Vector: - vector, err = result.Vector() - if err != nil { - return nil, err - } - case *model.Scalar: - scalar, err := result.Scalar() - if err != nil { - return nil, err - } - vector = model.Vector{&model.Sample{ - Value: scalar.Value, - Timestamp: scalar.Timestamp, - Metric: model.Metric{}, + switch v := result.Value.(type) { + case promql.Vector: + vector = v + case promql.Scalar: + vector = promql.Vector{promql.Sample{ + Point: promql.Point(v), + Metric: labels.Labels{}, }} default: return nil, fmt.Errorf("rule result is not a vector or scalar") @@ -82,15 +75,19 @@ func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine // Override the metric name and labels. for _, sample := range vector { - sample.Metric[model.MetricNameLabel] = model.LabelValue(rule.name) + lb := labels.NewBuilder(sample.Metric) - for label, value := range rule.labels { - if value == "" { - delete(sample.Metric, label) + lb.Set(labels.MetricName, rule.name) + + for _, l := range rule.labels { + if l.Value == "" { + lb.Del(l.Name) } else { - sample.Metric[label] = value + lb.Set(l.Name, l.Value) } } + + sample.Metric = lb.Labels() } return vector, nil diff --git a/template/template.go b/template/template.go index e02c1b51c..8ee9c4a26 100644 --- a/template/template.go +++ b/template/template.go @@ -27,6 +27,7 @@ import ( text_template "text/template" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" @@ -66,22 +67,21 @@ func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engi if res.Err != nil { return nil, res.Err } - var vector model.Vector + var vector promql.Vector switch v := res.Value.(type) { - case model.Matrix: + case promql.Matrix: return nil, errors.New("matrix return values not supported") - case model.Vector: + case promql.Vector: vector = v - case *model.Scalar: - vector = model.Vector{&model.Sample{ - Value: v.Value, - Timestamp: v.Timestamp, + case promql.Scalar: + vector = promql.Vector{promql.Sample{ + Point: promql.Point(v), }} - case *model.String: - vector = model.Vector{&model.Sample{ - Metric: model.Metric{"__value__": model.LabelValue(v.Value)}, - Timestamp: v.Timestamp, + case promql.String: + vector = promql.Vector{promql.Sample{ + Metric: labels.FromStrings("__value__", v.V), + Point: promql.Point{T: v.T}, }} default: panic("template.query: unhandled result value type") @@ -89,14 +89,12 @@ func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engi // promql.Vector is hard to work with in templates, so convert to // base data types. + // TODO(fabxc): probably not true anymore after type rework. var result = make(queryResult, len(vector)) for n, v := range vector { s := sample{ - Value: float64(v.Value), - Labels: make(map[string]string), - } - for label, value := range v.Metric { - s.Labels[string(label)] = string(value) + Value: v.V, + Labels: v.Metric.Map(), } result[n] = &s } @@ -119,7 +117,7 @@ func NewTemplateExpander(ctx context.Context, text string, name string, data int data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(ctx, q, timestamp, queryEngine) + return query(ctx, q, timestamp.Time(), queryEngine) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 340a6c58f..f011cfe65 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -17,6 +17,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net/http" "sort" "strconv" @@ -30,7 +31,6 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/storage/local" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/httputil" ) @@ -96,7 +96,7 @@ type API struct { targetRetriever targetRetriever context func(r *http.Request) context.Context - now func() model.Time + now func() time.Time } // NewAPI returns an initialized API type. @@ -106,7 +106,7 @@ func NewAPI(qe *promql.Engine, st local.Storage, tr targetRetriever) *API { Storage: st, targetRetriever: tr, context: route.Context, - now: model.Now, + now: time.Now, } } @@ -142,8 +142,8 @@ func (api *API) Register(r *route.Router) { } type queryData struct { - ResultType model.ValueType `json:"resultType"` - Result model.Value `json:"result"` + ResultType promql.ValueType `json:"resultType"` + Result promql.Value `json:"result"` } func (api *API) options(r *http.Request) (interface{}, *apiError) { @@ -151,7 +151,7 @@ func (api *API) options(r *http.Request) (interface{}, *apiError) { } func (api *API) query(r *http.Request) (interface{}, *apiError) { - var ts model.Time + var ts time.Time if t := r.FormValue("time"); t != "" { var err error ts, err = parseTime(t) @@ -262,7 +262,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} } - var start model.Time + var start time.Time if t := r.FormValue("start"); t != "" { var err error start, err = parseTime(t) @@ -270,10 +270,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } } else { - start = model.Earliest + start = time.Unix(math.MinInt64, 0) } - var end model.Time + var end time.Time if t := r.FormValue("end"); t != "" { var err error end, err = parseTime(t) @@ -281,10 +281,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } } else { - end = model.Latest + end = time.Unix(math.MaxInt64, 0) } - var matcherSets []metric.LabelMatchers + var matcherSets [][]*promql.LabelMatcher for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { @@ -293,22 +293,26 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { matcherSets = append(matcherSets, matchers) } - q, err := api.Storage.Querier() - if err != nil { - return nil, &apiError{errorExec, err} - } - defer q.Close() + // TODO(fabxc): temporarily disbaled. + _, _ = start, end + panic("disabled") - res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) - if err != nil { - return nil, &apiError{errorExec, err} - } + // q, err := api.Storage.Querier() + // if err != nil { + // return nil, &apiError{errorExec, err} + // } + // defer q.Close() - metrics := make([]model.Metric, 0, len(res)) - for _, met := range res { - metrics = append(metrics, met.Metric) - } - return metrics, nil + // res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) + // if err != nil { + // return nil, &apiError{errorExec, err} + // } + + // metrics := make([]model.Metric, 0, len(res)) + // for _, met := range res { + // metrics = append(metrics, met.Metric) + // } + // return metrics, nil } func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { @@ -317,25 +321,28 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} } - numDeleted := 0 - for _, s := range r.Form["match[]"] { - matchers, err := promql.ParseMetricSelector(s) - if err != nil { - return nil, &apiError{errorBadData, err} - } - n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...) - if err != nil { - return nil, &apiError{errorExec, err} - } - numDeleted += n - } + // TODO(fabxc): temporarily disabled + panic("disabled") - res := struct { - NumDeleted int `json:"numDeleted"` - }{ - NumDeleted: numDeleted, - } - return res, nil + // numDeleted := 0 + // for _, s := range r.Form["match[]"] { + // matchers, err := promql.ParseMetricSelector(s) + // if err != nil { + // return nil, &apiError{errorBadData, err} + // } + // n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...) + // if err != nil { + // return nil, &apiError{errorExec, err} + // } + // numDeleted += n + // } + + // res := struct { + // NumDeleted int `json:"numDeleted"` + // }{ + // NumDeleted: numDeleted, + // } + // return res, nil } type Target struct { @@ -417,15 +424,15 @@ func respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) { w.Write(b) } -func parseTime(s string) (model.Time, error) { +func parseTime(s string) (time.Time, error) { if t, err := strconv.ParseFloat(s, 64); err == nil { - ts := int64(t * float64(time.Second)) - return model.TimeFromUnixNano(ts), nil + s, ns := math.Modf(t) + return time.Unix(int64(s), int64(ns*float64(time.Second))), nil } if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return model.TimeFromUnixNano(t.UnixNano()), nil + return t, nil } - return 0, fmt.Errorf("cannot parse %q to a valid timestamp", s) + return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s) } func parseDuration(s string) (time.Duration, error) { diff --git a/web/federate.go b/web/federate.go index f910fbab3..00e845403 100644 --- a/web/federate.go +++ b/web/federate.go @@ -15,17 +15,13 @@ package web import ( "net/http" - "sort" - "github.com/golang/protobuf/proto" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/metric" ) var ( @@ -41,7 +37,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - var matcherSets []metric.LabelMatchers + var matcherSets [][]*promql.LabelMatcher for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { @@ -52,102 +48,114 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } var ( - minTimestamp = h.now().Add(-promql.StalenessDelta) - format = expfmt.Negotiate(req.Header) - enc = expfmt.NewEncoder(w, format) + // minTimestamp = h.now().Add(-promql.StalenessDelta) + format = expfmt.Negotiate(req.Header) + // enc = expfmt.NewEncoder(w, format) ) w.Header().Set("Content-Type", string(format)) - q, err := h.storage.Querier() - if err != nil { - federationErrors.Inc() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer q.Close() + federationErrors.Inc() + http.Error(w, errors.Errorf("federation disabled").Error(), http.StatusInternalServerError) + return - vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) - if err != nil { - federationErrors.Inc() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - sort.Sort(byName(vector)) + // q, err := h.storage.Querier() + // if err != nil { + // federationErrors.Inc() + // http.Error(w, err.Error(), http.StatusInternalServerError) + // return + // } + // defer q.Close() - var ( - lastMetricName model.LabelValue - protMetricFam *dto.MetricFamily - ) - for _, s := range vector { - nameSeen := false - globalUsed := map[model.LabelName]struct{}{} - protMetric := &dto.Metric{ - Untyped: &dto.Untyped{}, - } + // TODO(fabxc): support via TSDB storage. - for ln, lv := range s.Metric { - if lv == "" { - // No value means unset. Never consider those labels. - // This is also important to protect against nameless metrics. - continue - } - if ln == model.MetricNameLabel { - nameSeen = true - if lv == lastMetricName { - // We already have the name in the current MetricFamily, - // and we ignore nameless metrics. - continue - } - // Need to start a new MetricFamily. Ship off the old one (if any) before - // creating the new one. - if protMetricFam != nil { - if err := enc.Encode(protMetricFam); err != nil { - federationErrors.Inc() - log.With("err", err).Error("federation failed") - return - } - } - protMetricFam = &dto.MetricFamily{ - Type: dto.MetricType_UNTYPED.Enum(), - Name: proto.String(string(lv)), - } - lastMetricName = lv - continue - } - protMetric.Label = append(protMetric.Label, &dto.LabelPair{ - Name: proto.String(string(ln)), - Value: proto.String(string(lv)), - }) - if _, ok := h.externalLabels[ln]; ok { - globalUsed[ln] = struct{}{} - } - } - if !nameSeen { - log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.") - continue - } - // Attach global labels if they do not exist yet. - for ln, lv := range h.externalLabels { - if _, ok := globalUsed[ln]; !ok { - protMetric.Label = append(protMetric.Label, &dto.LabelPair{ - Name: proto.String(string(ln)), - Value: proto.String(string(lv)), - }) - } - } + // var sets []tsdb.SeriesSet + // for _, matchers := range matcherSets { + // set, err := q.Select(matchers) + // sets = append(sets, set) + // } - protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) - protMetric.Untyped.Value = proto.Float64(float64(s.Value)) + // vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) + // if err != nil { + // federationErrors.Inc() + // http.Error(w, err.Error(), http.StatusInternalServerError) + // return + // } + // sort.Sort(byName(vector)) - protMetricFam.Metric = append(protMetricFam.Metric, protMetric) - } - // Still have to ship off the last MetricFamily, if any. - if protMetricFam != nil { - if err := enc.Encode(protMetricFam); err != nil { - federationErrors.Inc() - log.With("err", err).Error("federation failed") - } - } + // var ( + // lastMetricName model.LabelValue + // protMetricFam *dto.MetricFamily + // ) + // for _, s := range vector { + // nameSeen := false + // globalUsed := map[model.LabelName]struct{}{} + // protMetric := &dto.Metric{ + // Untyped: &dto.Untyped{}, + // } + + // for ln, lv := range s.Metric { + // if lv == "" { + // // No value means unset. Never consider those labels. + // // This is also important to protect against nameless metrics. + // continue + // } + // if ln == model.MetricNameLabel { + // nameSeen = true + // if lv == lastMetricName { + // // We already have the name in the current MetricFamily, + // // and we ignore nameless metrics. + // continue + // } + // // Need to start a new MetricFamily. Ship off the old one (if any) before + // // creating the new one. + // if protMetricFam != nil { + // if err := enc.Encode(protMetricFam); err != nil { + // federationErrors.Inc() + // log.With("err", err).Error("federation failed") + // return + // } + // } + // protMetricFam = &dto.MetricFamily{ + // Type: dto.MetricType_UNTYPED.Enum(), + // Name: proto.String(string(lv)), + // } + // lastMetricName = lv + // continue + // } + // protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + // Name: proto.String(string(ln)), + // Value: proto.String(string(lv)), + // }) + // if _, ok := h.externalLabels[ln]; ok { + // globalUsed[ln] = struct{}{} + // } + // } + // if !nameSeen { + // log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.") + // continue + // } + // // Attach global labels if they do not exist yet. + // for ln, lv := range h.externalLabels { + // if _, ok := globalUsed[ln]; !ok { + // protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + // Name: proto.String(string(ln)), + // Value: proto.String(string(lv)), + // }) + // } + // } + + // protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) + // protMetric.Untyped.Value = proto.Float64(float64(s.Value)) + + // protMetricFam.Metric = append(protMetricFam.Metric, protMetric) + // } + // // Still have to ship off the last MetricFamily, if any. + // if protMetricFam != nil { + // if err := enc.Encode(protMetricFam); err != nil { + // federationErrors.Inc() + // log.With("err", err).Error("federation failed") + // } + // } } // byName makes a model.Vector sortable by metric name.