diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 37ac2adf8f..7f949584c9 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -23,18 +23,19 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync/atomic" "testing" "time" - "github.com/prometheus/prometheus/pkg/relabel" - - yaml "gopkg.in/yaml.v2" - + "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + yaml "gopkg.in/yaml.v2" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/util/testutil" ) @@ -79,89 +80,71 @@ func TestHandlerNextBatch(t *testing.T) { expected := append([]*Alert{}, h.queue...) - b := h.nextBatch() - - testutil.Equals(t, maxBatchSize, len(b)) - - testutil.Assert(t, alertsEqual(expected[0:maxBatchSize], b), "First batch did not match") - - b = h.nextBatch() - - testutil.Equals(t, maxBatchSize, len(b)) - - testutil.Assert(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], b), "Second batch did not match") - - b = h.nextBatch() - - testutil.Equals(t, 1, len(b)) - - testutil.Assert(t, alertsEqual(expected[2*maxBatchSize:], b), "Third batch did not match") - + testutil.Ok(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) + testutil.Ok(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch())) + testutil.Ok(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) testutil.Assert(t, len(h.queue) == 0, "Expected queue to be empty but got %d alerts", len(h.queue)) } -func alertsEqual(a, b []*Alert) bool { +func alertsEqual(a, b []*Alert) error { if len(a) != len(b) { - fmt.Println("len mismatch") - return false + return errors.Errorf("length mismatch: %v != %v", a, b) } for i, alert := range a { if !labels.Equal(alert.Labels, b[i].Labels) { - fmt.Println("mismatch", alert.Labels, b[i].Labels) - return false + return errors.Errorf("label mismatch at index %d: %s != %s", i, alert.Labels, b[i].Labels) } } - return true + return nil } func TestHandlerSendAll(t *testing.T) { var ( - expected []*Alert - status1, status2 int + errc = make(chan error, 1) + expected = make([]*Alert, 0, maxBatchSize) + status1, status2 = int32(http.StatusOK), int32(http.StatusOK) ) - f := func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - var alerts []*Alert - testutil.Ok(t, json.NewDecoder(r.Body).Decode(&alerts)) - - testutil.Assert(t, alertsEqual(alerts, expected), "Unexpected alerts received %v exp %v", alerts, expected) + newHTTPServer := func(u, p string, status *int32) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + defer func() { + if err == nil { + return + } + select { + case errc <- err: + default: + } + }() + user, pass, _ := r.BasicAuth() + if user != u || pass != p { + err = errors.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p) + w.WriteHeader(http.StatusInternalServerError) + return + } + var alerts []*Alert + err = json.NewDecoder(r.Body).Decode(&alerts) + if err == nil { + err = alertsEqual(expected, alerts) + } + w.WriteHeader(int(atomic.LoadInt32(status))) + })) } - server1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - user, pass, _ := r.BasicAuth() - testutil.Assert( - t, - user == "prometheus" || pass == "testing_password", - "Incorrect auth details for an alertmanager", - ) - - f(w, r) - w.WriteHeader(status1) - })) - server2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - user, pass, _ := r.BasicAuth() - testutil.Assert( - t, - user == "" || pass == "", - "Incorrectly received auth details for an alertmanager", - ) - - f(w, r) - w.WriteHeader(status2) - })) - + server1 := newHTTPServer("prometheus", "testing_password", &status1) + server2 := newHTTPServer("", "", &status2) defer server1.Close() defer server2.Close() h := NewManager(&Options{}, nil) - authClient, _ := config_util.NewClientFromConfig(config_util.HTTPClientConfig{ - BasicAuth: &config_util.BasicAuth{ - Username: "prometheus", - Password: "testing_password", - }, - }, "auth_alertmanager", false) + authClient, _ := config_util.NewClientFromConfig( + config_util.HTTPClientConfig{ + BasicAuth: &config_util.BasicAuth{ + Username: "prometheus", + Password: "testing_password", + }, + }, "auth_alertmanager", false) h.alertmanagers = make(map[string]*alertmanagerSet) @@ -199,15 +182,25 @@ func TestHandlerSendAll(t *testing.T) { }) } - status1 = http.StatusOK - status2 = http.StatusOK - testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr := func() { + t.Helper() + select { + case err := <-errc: + testutil.Ok(t, err) + default: + } + } - status1 = http.StatusNotFound testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() - status2 = http.StatusInternalServerError + atomic.StoreInt32(&status1, int32(http.StatusNotFound)) + testutil.Assert(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + atomic.StoreInt32(&status2, int32(http.StatusInternalServerError)) testutil.Assert(t, !h.sendAll(h.queue...), "all sends succeeded unexpectedly") + checkNoErr() } func TestCustomDo(t *testing.T) { @@ -268,7 +261,7 @@ func TestExternalLabels(t *testing.T) { {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, } - testutil.Assert(t, alertsEqual(expected, h.queue), "Expected alerts %v, got %v", expected, h.queue) + testutil.Ok(t, alertsEqual(expected, h.queue)) } func TestHandlerRelabel(t *testing.T) { @@ -304,31 +297,49 @@ func TestHandlerRelabel(t *testing.T) { {Labels: labels.FromStrings("alertname", "renamed")}, } - testutil.Assert(t, alertsEqual(expected, h.queue), "Expected alerts %v, got %v", expected, h.queue) + testutil.Ok(t, alertsEqual(expected, h.queue)) } func TestHandlerQueueing(t *testing.T) { var ( - unblock = make(chan struct{}) - called = make(chan struct{}) - expected []*Alert + expectedc = make(chan []*Alert) + called = make(chan struct{}) + done = make(chan struct{}) + errc = make(chan error, 1) ) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called <- struct{}{} - <-unblock + // Notify the test function that we have received something. + select { + case called <- struct{}{}: + case <-done: + return + } - defer r.Body.Close() - - var alerts []*Alert - testutil.Ok(t, json.NewDecoder(r.Body).Decode(&alerts)) - - testutil.Assert(t, alertsEqual(expected, alerts), "Expected alerts %v, got %v", expected, alerts) + // Wait for the test function to unblock us. + select { + case expected := <-expectedc: + var alerts []*Alert + err := json.NewDecoder(r.Body).Decode(&alerts) + if err == nil { + err = alertsEqual(expected, alerts) + } + select { + case errc <- err: + default: + } + case <-done: + } })) + defer func() { + close(done) + server.Close() + }() - h := NewManager(&Options{ - QueueCapacity: 3 * maxBatchSize, - }, + h := NewManager( + &Options{ + QueueCapacity: 3 * maxBatchSize, + }, nil, ) @@ -345,57 +356,64 @@ func TestHandlerQueueing(t *testing.T) { }, cfg: &am1Cfg, } + go h.Run(nil) + defer h.Stop() var alerts []*Alert - for i := range make([]struct{}, 20*maxBatchSize) { alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), }) } - c := make(chan map[string][]*targetgroup.Group) - go h.Run(c) - defer h.Stop() - - h.Send(alerts[:4*maxBatchSize]...) - - // If the batch is larger than the queue size, the front should be truncated - // from the front. Thus, we start at i=1. - for i := 1; i < 4; i++ { - select { - case <-called: - expected = alerts[i*maxBatchSize : (i+1)*maxBatchSize] - unblock <- struct{}{} - case <-time.After(5 * time.Second): - t.Fatalf("Alerts were not pushed") + assertAlerts := func(expected []*Alert) { + t.Helper() + for { + select { + case <-called: + expectedc <- expected + case err := <-errc: + testutil.Ok(t, err) + return + case <-time.After(5 * time.Second): + t.Fatalf("Alerts were not pushed") + } } } - // Send one batch, wait for it to arrive and block so the queue fills up. - // Then check whether the queue is truncated in the front once its full. + // If the batch is larger than the queue capacity, it should be truncated + // from the front. + h.Send(alerts[:4*maxBatchSize]...) + for i := 1; i < 4; i++ { + assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) + } + + // Send one batch, wait for it to arrive and block the server so the queue fills up. h.Send(alerts[:maxBatchSize]...) <-called - // Fill the 3*maxBatchSize queue. - h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) + // Send several batches while the server is still blocked so the queue + // fills up to its maximum capacity (3*maxBatchSize). Then check that the + // queue is truncated in the front. + h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) // this batch should be dropped. h.Send(alerts[2*maxBatchSize : 3*maxBatchSize]...) h.Send(alerts[3*maxBatchSize : 4*maxBatchSize]...) // Send the batch that drops the first one. h.Send(alerts[4*maxBatchSize : 5*maxBatchSize]...) - expected = alerts[:maxBatchSize] - unblock <- struct{}{} + // Unblock the server. + expectedc <- alerts[:maxBatchSize] + select { + case err := <-errc: + testutil.Ok(t, err) + case <-time.After(5 * time.Second): + t.Fatalf("Alerts were not pushed") + } + // Verify that we receive the last 3 batches. for i := 2; i < 5; i++ { - select { - case <-called: - expected = alerts[i*maxBatchSize : (i+1)*maxBatchSize] - unblock <- struct{}{} - case <-time.After(5 * time.Second): - t.Fatalf("Alerts were not pushed") - } + assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) } }