mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
chore(notifier): add a reproducer for https://github.com/prometheus/prometheus/issues/13676
to show "targets groups update" starvation when the notifications queue is full and an Alertmanager is down. The existing `TestHangingNotifier` that was added in https://github.com/prometheus/prometheus/pull/10948 doesn't really reflect the reality as the SD changes are manually fed into `syncCh` in a continuous way, whereas in reality, updates are only resent every `updatert`. The test added here sets up an SD manager and links it to the notifier. The SD changes will be triggered by that manager as it's done in reality. Signed-off-by: machine424 <ayoubmrini424@gmail.com> Co-authored-by: Ethan Hunter <ehunter@hudson-trading.com>
This commit is contained in:
parent
545d31f184
commit
94d28cd6cf
|
@ -120,6 +120,16 @@ func Name(n string) func(*Manager) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Updatert sets the updatert of the manager.
|
||||||
|
// Used to speed up tests.
|
||||||
|
func Updatert(u time.Duration) func(*Manager) {
|
||||||
|
return func(m *Manager) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
m.updatert = u
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// HTTPClientOptions sets the list of HTTP client options to expose to
|
// HTTPClientOptions sets the list of HTTP client options to expose to
|
||||||
// Discoverers. It is up to Discoverers to choose to use the options provided.
|
// Discoverers. It is up to Discoverers to choose to use the options provided.
|
||||||
func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) {
|
func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) {
|
||||||
|
|
|
@ -26,13 +26,17 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/log"
|
||||||
"github.com/prometheus/alertmanager/api/v2/models"
|
"github.com/prometheus/alertmanager/api/v2/models"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
config_util "github.com/prometheus/common/config"
|
config_util "github.com/prometheus/common/config"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/discovery"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
@ -811,3 +815,148 @@ func TestHangingNotifier(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: renameit and even replace TestHangingNotifier with it.
|
||||||
|
// TestHangingNotifierXXX ensures that the notifier takes into account SD changes even when there are
|
||||||
|
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
|
||||||
|
func TestHangingNotifierXXX(t *testing.T) {
|
||||||
|
const (
|
||||||
|
batches = 100
|
||||||
|
alertsCount = maxBatchSize * batches
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
sendTimeout = 10 * time.Millisecond
|
||||||
|
sdUpdatert = sendTimeout / 2
|
||||||
|
|
||||||
|
done = make(chan struct{})
|
||||||
|
)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Set up a faulty Alertmanager.
|
||||||
|
var faultyCalled atomic.Bool
|
||||||
|
faultyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
faultyCalled.Store(true)
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Hour):
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
faultyURL, err := url.Parse(faultyServer.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Set up a functional Alertmanager.
|
||||||
|
var functionalCalled atomic.Bool
|
||||||
|
functionalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
functionalCalled.Store(true)
|
||||||
|
}))
|
||||||
|
functionalURL, err := url.Parse(functionalServer.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Initialize the discovery manager
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
reg := prometheus.NewRegistry()
|
||||||
|
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||||
|
require.NoError(t, err)
|
||||||
|
sdManager := discovery.NewManager(
|
||||||
|
ctx,
|
||||||
|
log.NewNopLogger(),
|
||||||
|
reg,
|
||||||
|
sdMetrics,
|
||||||
|
discovery.Name("sd-manager"),
|
||||||
|
discovery.Updatert(sdUpdatert),
|
||||||
|
)
|
||||||
|
go sdManager.Run()
|
||||||
|
|
||||||
|
// Set up the notifier with both faulty and functional Alertmanagers.
|
||||||
|
notifier := NewManager(
|
||||||
|
&Options{
|
||||||
|
QueueCapacity: alertsCount,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
notifier.alertmanagers = make(map[string]*alertmanagerSet)
|
||||||
|
amCfg := config.DefaultAlertmanagerConfig
|
||||||
|
amCfg.Timeout = model.Duration(sendTimeout)
|
||||||
|
notifier.alertmanagers["config-0"] = &alertmanagerSet{
|
||||||
|
ams: []alertmanager{
|
||||||
|
alertmanagerMock{
|
||||||
|
urlf: func() string { return faultyURL.String() },
|
||||||
|
},
|
||||||
|
alertmanagerMock{
|
||||||
|
urlf: func() string { return functionalURL.String() },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cfg: &amCfg,
|
||||||
|
metrics: notifier.metrics,
|
||||||
|
}
|
||||||
|
go notifier.Run(sdManager.SyncCh())
|
||||||
|
defer notifier.Stop()
|
||||||
|
|
||||||
|
require.Len(t, notifier.Alertmanagers(), 2)
|
||||||
|
|
||||||
|
// Enqueue the alerts.
|
||||||
|
var alerts []*Alert
|
||||||
|
for i := range make([]struct{}, alertsCount) {
|
||||||
|
alerts = append(alerts, &Alert{
|
||||||
|
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
notifier.Send(alerts...)
|
||||||
|
|
||||||
|
// Wait for the Alertmanagers to start receiving alerts.
|
||||||
|
// 10*sdUpdatert is used as an arbitrary timeout here.
|
||||||
|
timeout := time.After(10 * sdUpdatert)
|
||||||
|
loop1:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.")
|
||||||
|
default:
|
||||||
|
if faultyCalled.Load() && functionalCalled.Load() {
|
||||||
|
break loop1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request to remove the faulty Alertmanager.
|
||||||
|
c := map[string]discovery.Configs{
|
||||||
|
"config-0": {
|
||||||
|
discovery.StaticConfig{
|
||||||
|
&targetgroup.Group{
|
||||||
|
Targets: []model.LabelSet{
|
||||||
|
{
|
||||||
|
model.AddressLabel: model.LabelValue(functionalURL.Host),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, sdManager.ApplyConfig(c))
|
||||||
|
|
||||||
|
// The notifier should not wait until the alerts queue is empty to apply the discovery changes
|
||||||
|
// A faulty Alertmanager could cause each alert sending cycle to take up to AlertmanagerConfig.Timeout
|
||||||
|
// The queue may never be emptied, as the arrival rate could be larger than the departure rate
|
||||||
|
// It could even overflow and alerts could be dropped.
|
||||||
|
timeout = time.After(batches * sendTimeout)
|
||||||
|
loop2:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("Timeout, the faulty alertmanager not removed on time.")
|
||||||
|
default:
|
||||||
|
// The faulty alertmanager was dropped.
|
||||||
|
if len(notifier.Alertmanagers()) == 1 {
|
||||||
|
// Prevent from TOCTOU.
|
||||||
|
require.Positive(t, notifier.queueLen())
|
||||||
|
break loop2
|
||||||
|
}
|
||||||
|
require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue