mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-19 23:37:31 -07:00
notifier: fully use labels.Labels
This commit is contained in:
parent
f8fc1f5bb2
commit
8b4e4a9d2b
|
@ -35,7 +35,7 @@ import (
|
|||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
"github.com/prometheus/prometheus/pkg/relabel"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
)
|
||||
|
||||
|
@ -282,6 +282,8 @@ func (n *Notifier) Send(alerts ...*Alert) {
|
|||
lb.Set(string(ln), string(lv))
|
||||
}
|
||||
}
|
||||
|
||||
a.Labels = lb.Labels()
|
||||
}
|
||||
|
||||
alerts = n.relabelAlerts(alerts)
|
||||
|
@ -310,18 +312,16 @@ func (n *Notifier) Send(alerts ...*Alert) {
|
|||
}
|
||||
|
||||
func (n *Notifier) relabelAlerts(alerts []*Alert) []*Alert {
|
||||
// TODO(fabxc): temporarily disabled.
|
||||
return alerts
|
||||
var relabeledAlerts []*Alert
|
||||
|
||||
// var relabeledAlerts []*Alert
|
||||
// for _, alert := range alerts {
|
||||
// labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...)
|
||||
// if labels != nil {
|
||||
// alert.Labels = labels
|
||||
// relabeledAlerts = append(relabeledAlerts, alert)
|
||||
// }
|
||||
// }
|
||||
// return relabeledAlerts
|
||||
for _, alert := range alerts {
|
||||
labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...)
|
||||
if labels != nil {
|
||||
alert.Labels = labels
|
||||
relabeledAlerts = append(relabeledAlerts, alert)
|
||||
}
|
||||
}
|
||||
return relabeledAlerts
|
||||
}
|
||||
|
||||
// setMore signals that the alert queue has items.
|
||||
|
@ -452,15 +452,15 @@ type alertmanager interface {
|
|||
url() string
|
||||
}
|
||||
|
||||
type alertmanagerLabels model.LabelSet
|
||||
type alertmanagerLabels struct{ labels.Labels }
|
||||
|
||||
const pathLabel = "__alerts_path__"
|
||||
|
||||
func (a alertmanagerLabels) url() string {
|
||||
u := &url.URL{
|
||||
Scheme: string(a[model.SchemeLabel]),
|
||||
Host: string(a[model.AddressLabel]),
|
||||
Path: string(a[pathLabel]),
|
||||
Scheme: a.Get(model.SchemeLabel),
|
||||
Host: a.Get(model.AddressLabel),
|
||||
Path: a.Get(pathLabel),
|
||||
}
|
||||
return u.String()
|
||||
}
|
||||
|
@ -530,22 +530,30 @@ func postPath(pre string) string {
|
|||
func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagerConfig) ([]alertmanager, error) {
|
||||
var res []alertmanager
|
||||
|
||||
for _, lset := range tg.Targets {
|
||||
for _, tlset := range tg.Targets {
|
||||
lbls := make([]labels.Label, 0, len(tlset)+2+len(tg.Labels))
|
||||
|
||||
for ln, lv := range tlset {
|
||||
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
|
||||
}
|
||||
// Set configured scheme as the initial scheme label for overwrite.
|
||||
lset[model.SchemeLabel] = model.LabelValue(cfg.Scheme)
|
||||
lset[pathLabel] = model.LabelValue(postPath(cfg.PathPrefix))
|
||||
lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: cfg.Scheme})
|
||||
lbls = append(lbls, labels.Label{Name: pathLabel, Value: postPath(cfg.PathPrefix)})
|
||||
|
||||
// Combine target labels with target group labels.
|
||||
for ln, lv := range tg.Labels {
|
||||
if _, ok := lset[ln]; !ok {
|
||||
lset[ln] = lv
|
||||
if _, ok := tlset[ln]; !ok {
|
||||
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
|
||||
}
|
||||
}
|
||||
lset := relabel.Process(lset, cfg.RelabelConfigs...)
|
||||
|
||||
lset := relabel.Process(labels.New(lbls...), cfg.RelabelConfigs...)
|
||||
if lset == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
lb := labels.NewBuilder(lset)
|
||||
|
||||
// addPort checks whether we should add a default port to the address.
|
||||
// If the address is not valid, we don't append a port either.
|
||||
addPort := func(s string) bool {
|
||||
|
@ -558,10 +566,11 @@ func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagerConfi
|
|||
_, _, err := net.SplitHostPort(s + ":1234")
|
||||
return err == nil
|
||||
}
|
||||
addr := lset.Get(model.AddressLabel)
|
||||
// If it's an address with no trailing port, infer it based on the used scheme.
|
||||
if addr := string(lset[model.AddressLabel]); addPort(addr) {
|
||||
if addPort(addr) {
|
||||
// Addresses reaching this point are already wrapped in [] if necessary.
|
||||
switch lset[model.SchemeLabel] {
|
||||
switch lset.Get(model.SchemeLabel) {
|
||||
case "http", "":
|
||||
addr = addr + ":80"
|
||||
case "https":
|
||||
|
@ -569,21 +578,22 @@ func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagerConfi
|
|||
default:
|
||||
return nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
|
||||
}
|
||||
lset[model.AddressLabel] = model.LabelValue(addr)
|
||||
lb.Set(model.AddressLabel, addr)
|
||||
}
|
||||
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
|
||||
|
||||
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Meta labels are deleted after relabelling. Other internal labels propagate to
|
||||
// the target which decides whether they will be part of their label set.
|
||||
for ln := range lset {
|
||||
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
|
||||
delete(lset, ln)
|
||||
for _, l := range lset {
|
||||
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
|
||||
lb.Del(l.Name)
|
||||
}
|
||||
}
|
||||
|
||||
res = append(res, alertmanagerLabels(lset))
|
||||
res = append(res, alertmanagerLabels{lset})
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -18,12 +18,12 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
)
|
||||
|
||||
func TestPostPath(t *testing.T) {
|
||||
|
@ -62,21 +62,19 @@ func TestHandlerNextBatch(t *testing.T) {
|
|||
h := New(&Options{})
|
||||
|
||||
for i := range make([]struct{}, 2*maxBatchSize+1) {
|
||||
h.queue = append(h.queue, &model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": model.LabelValue(fmt.Sprintf("%d", i)),
|
||||
},
|
||||
h.queue = append(h.queue, &Alert{
|
||||
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
|
||||
})
|
||||
}
|
||||
|
||||
expected := append(model.Alerts{}, h.queue...)
|
||||
expected := append([]*Alert{}, h.queue...)
|
||||
|
||||
b := h.nextBatch()
|
||||
|
||||
if len(b) != maxBatchSize {
|
||||
t.Fatalf("Expected first batch of length %d, but got %d", maxBatchSize, len(b))
|
||||
}
|
||||
if reflect.DeepEqual(expected[0:maxBatchSize], b) {
|
||||
if !alertsEqual(expected[0:maxBatchSize], b) {
|
||||
t.Fatalf("First batch did not match")
|
||||
}
|
||||
|
||||
|
@ -85,7 +83,7 @@ func TestHandlerNextBatch(t *testing.T) {
|
|||
if len(b) != maxBatchSize {
|
||||
t.Fatalf("Expected second batch of length %d, but got %d", maxBatchSize, len(b))
|
||||
}
|
||||
if reflect.DeepEqual(expected[maxBatchSize:2*maxBatchSize], b) {
|
||||
if !alertsEqual(expected[maxBatchSize:2*maxBatchSize], b) {
|
||||
t.Fatalf("Second batch did not match")
|
||||
}
|
||||
|
||||
|
@ -94,7 +92,7 @@ func TestHandlerNextBatch(t *testing.T) {
|
|||
if len(b) != 1 {
|
||||
t.Fatalf("Expected third batch of length %d, but got %d", 1, len(b))
|
||||
}
|
||||
if reflect.DeepEqual(expected[2*maxBatchSize:], b) {
|
||||
if !alertsEqual(expected[2*maxBatchSize:], b) {
|
||||
t.Fatalf("Third batch did not match")
|
||||
}
|
||||
|
||||
|
@ -103,12 +101,14 @@ func TestHandlerNextBatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func alertsEqual(a, b model.Alerts) bool {
|
||||
func alertsEqual(a, b []*Alert) bool {
|
||||
if len(a) != len(b) {
|
||||
fmt.Println("len mismatch")
|
||||
return false
|
||||
}
|
||||
for i, alert := range a {
|
||||
if !alert.Labels.Equal(b[i].Labels) {
|
||||
if !labels.Equal(alert.Labels, b[i].Labels) {
|
||||
fmt.Println("mismatch", alert.Labels, b[i].Labels)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -117,14 +117,14 @@ func alertsEqual(a, b model.Alerts) bool {
|
|||
|
||||
func TestHandlerSendAll(t *testing.T) {
|
||||
var (
|
||||
expected model.Alerts
|
||||
expected []*Alert
|
||||
status1, status2 int
|
||||
)
|
||||
|
||||
f := func(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
|
||||
var alerts model.Alerts
|
||||
var alerts []*Alert
|
||||
if err := json.NewDecoder(r.Body).Decode(&alerts); err != nil {
|
||||
t.Fatalf("Unexpected error on input decoding: %s", err)
|
||||
}
|
||||
|
@ -162,15 +162,11 @@ func TestHandlerSendAll(t *testing.T) {
|
|||
})
|
||||
|
||||
for i := range make([]struct{}, maxBatchSize) {
|
||||
h.queue = append(h.queue, &model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": model.LabelValue(fmt.Sprintf("%d", i)),
|
||||
},
|
||||
h.queue = append(h.queue, &Alert{
|
||||
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
|
||||
})
|
||||
expected = append(expected, &model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": model.LabelValue(fmt.Sprintf("%d", i)),
|
||||
},
|
||||
expected = append(expected, &Alert{
|
||||
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -207,33 +203,19 @@ func TestExternalLabels(t *testing.T) {
|
|||
})
|
||||
|
||||
// This alert should get the external label attached.
|
||||
h.Send(&model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "test",
|
||||
},
|
||||
h.Send(&Alert{
|
||||
Labels: labels.FromStrings("alertname", "test"),
|
||||
})
|
||||
|
||||
// This alert should get the external label attached, but then set to "c"
|
||||
// through relabelling.
|
||||
h.Send(&model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "externalrelabelthis",
|
||||
},
|
||||
h.Send(&Alert{
|
||||
Labels: labels.FromStrings("alertname", "externalrelabelthis"),
|
||||
})
|
||||
|
||||
expected := []*model.Alert{
|
||||
{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "test",
|
||||
"a": "b",
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "externalrelabelthis",
|
||||
"a": "c",
|
||||
},
|
||||
},
|
||||
expected := []*Alert{
|
||||
{Labels: labels.FromStrings("alertname", "test", "a", "b")},
|
||||
{Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")},
|
||||
}
|
||||
|
||||
if !alertsEqual(expected, h.queue) {
|
||||
|
@ -261,25 +243,17 @@ func TestHandlerRelabel(t *testing.T) {
|
|||
})
|
||||
|
||||
// This alert should be dropped due to the configuration
|
||||
h.Send(&model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "drop",
|
||||
},
|
||||
h.Send(&Alert{
|
||||
Labels: labels.FromStrings("alertname", "drop"),
|
||||
})
|
||||
|
||||
// This alert should be replaced due to the configuration
|
||||
h.Send(&model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "rename",
|
||||
},
|
||||
h.Send(&Alert{
|
||||
Labels: labels.FromStrings("alertname", "rename"),
|
||||
})
|
||||
|
||||
expected := []*model.Alert{
|
||||
{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "renamed",
|
||||
},
|
||||
},
|
||||
expected := []*Alert{
|
||||
{Labels: labels.FromStrings("alertname", "renamed")},
|
||||
}
|
||||
|
||||
if !alertsEqual(expected, h.queue) {
|
||||
|
@ -291,7 +265,7 @@ func TestHandlerQueueing(t *testing.T) {
|
|||
var (
|
||||
unblock = make(chan struct{})
|
||||
called = make(chan struct{})
|
||||
expected model.Alerts
|
||||
expected []*Alert
|
||||
)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -300,7 +274,7 @@ func TestHandlerQueueing(t *testing.T) {
|
|||
|
||||
defer r.Body.Close()
|
||||
|
||||
var alerts model.Alerts
|
||||
var alerts []*Alert
|
||||
if err := json.NewDecoder(r.Body).Decode(&alerts); err != nil {
|
||||
t.Fatalf("Unexpected error on input decoding: %s", err)
|
||||
}
|
||||
|
@ -324,12 +298,11 @@ func TestHandlerQueueing(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
var alerts model.Alerts
|
||||
var alerts []*Alert
|
||||
|
||||
for i := range make([]struct{}, 20*maxBatchSize) {
|
||||
alerts = append(alerts, &model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": model.LabelValue(fmt.Sprintf("%d", i)),
|
||||
},
|
||||
alerts = append(alerts, &Alert{
|
||||
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,17 @@ func (ls Labels) MarshalJSON() ([]byte, error) {
|
|||
return json.Marshal(ls.Map())
|
||||
}
|
||||
|
||||
func (ls *Labels) UnmarshalJSON(b []byte) error {
|
||||
var m map[string]string
|
||||
|
||||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*ls = FromMap(m)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Hash returns a hash value for the label set.
|
||||
func (ls Labels) Hash() uint64 {
|
||||
b := make([]byte, 0, 1024)
|
||||
|
|
Loading…
Reference in a new issue