mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
discovery/marathon: Fix race conditions in test
The concurrency applied before is in most cases not even needed. With a cap=1 channel, most tests are much cleaner. TestMarathonSDRunAndStop was trickier. It could even have blocked before. This also includes a general refactoring of the whole file.
This commit is contained in:
parent
bc58d3dc50
commit
03adbe57e4
|
@ -24,108 +24,114 @@ import (
|
|||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
var marathonValidLabel = map[string]string{"prometheus": "yes"}
|
||||
var (
|
||||
marathonValidLabel = map[string]string{"prometheus": "yes"}
|
||||
testServers = []string{"http://localhost:8080"}
|
||||
)
|
||||
|
||||
func newTestDiscovery(client AppListClient) (chan []*config.TargetGroup, *Discovery) {
|
||||
ch := make(chan []*config.TargetGroup)
|
||||
md := &Discovery{
|
||||
Servers: []string{"http://localhost:8080"},
|
||||
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
|
||||
md := Discovery{
|
||||
Servers: testServers,
|
||||
Client: client,
|
||||
}
|
||||
return ch, md
|
||||
return md.updateServices(context.Background(), ch)
|
||||
}
|
||||
|
||||
func TestMarathonSDHandleError(t *testing.T) {
|
||||
var errTesting = errors.New("testing failure")
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return nil, errTesting
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %s", tg)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(context.Background(), ch)
|
||||
if err != errTesting {
|
||||
var (
|
||||
errTesting = errors.New("testing failure")
|
||||
ch = make(chan []*config.TargetGroup, 1)
|
||||
client = func(url string) (*AppList, error) { return nil, errTesting }
|
||||
)
|
||||
if err := testUpdateServices(client, ch); err != errTesting {
|
||||
t.Fatalf("Expected error: %s", err)
|
||||
}
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %s", tg)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDEmptyList(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return &AppList{}, nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
if len(tg) > 0 {
|
||||
t.Fatalf("Got group: %v", tg)
|
||||
}
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(context.Background(), ch)
|
||||
if err != nil {
|
||||
var (
|
||||
ch = make(chan []*config.TargetGroup, 1)
|
||||
client = func(url string) (*AppList, error) { return &AppList{}, nil }
|
||||
)
|
||||
if err := testUpdateServices(client, ch); err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
select {
|
||||
case tg := <-ch:
|
||||
if len(tg) > 0 {
|
||||
t.Fatalf("Got group: %v", tg)
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func marathonTestAppList(labels map[string]string, runningTasks int) *AppList {
|
||||
task := Task{
|
||||
ID: "test-task-1",
|
||||
Host: "mesos-slave1",
|
||||
Ports: []uint32{31000},
|
||||
}
|
||||
docker := DockerContainer{Image: "repo/image:tag"}
|
||||
container := Container{Docker: docker}
|
||||
app := App{
|
||||
ID: "test-service",
|
||||
Tasks: []Task{task},
|
||||
RunningTasks: runningTasks,
|
||||
Labels: labels,
|
||||
Container: container,
|
||||
}
|
||||
var (
|
||||
task = Task{
|
||||
ID: "test-task-1",
|
||||
Host: "mesos-slave1",
|
||||
Ports: []uint32{31000},
|
||||
}
|
||||
docker = DockerContainer{Image: "repo/image:tag"}
|
||||
container = Container{Docker: docker}
|
||||
app = App{
|
||||
ID: "test-service",
|
||||
Tasks: []Task{task},
|
||||
RunningTasks: runningTasks,
|
||||
Labels: labels,
|
||||
Container: container,
|
||||
}
|
||||
)
|
||||
return &AppList{
|
||||
Apps: []App{app},
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDSendGroup(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tgs := <-ch:
|
||||
tg := tgs[0]
|
||||
|
||||
if tg.Source != "test-service" {
|
||||
t.Fatalf("Wrong target group name: %s", tg.Source)
|
||||
}
|
||||
if len(tg.Targets) != 1 {
|
||||
t.Fatalf("Wrong number of targets: %v", tg.Targets)
|
||||
}
|
||||
tgt := tg.Targets[0]
|
||||
if tgt[model.AddressLabel] != "mesos-slave1:31000" {
|
||||
t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel])
|
||||
}
|
||||
default:
|
||||
t.Fatal("Did not get a target group.")
|
||||
var (
|
||||
ch = make(chan []*config.TargetGroup, 1)
|
||||
client = func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(context.Background(), ch)
|
||||
if err != nil {
|
||||
)
|
||||
if err := testUpdateServices(client, ch); err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
select {
|
||||
case tgs := <-ch:
|
||||
tg := tgs[0]
|
||||
|
||||
if tg.Source != "test-service" {
|
||||
t.Fatalf("Wrong target group name: %s", tg.Source)
|
||||
}
|
||||
if len(tg.Targets) != 1 {
|
||||
t.Fatalf("Wrong number of targets: %v", tg.Targets)
|
||||
}
|
||||
tgt := tg.Targets[0]
|
||||
if tgt[model.AddressLabel] != "mesos-slave1:31000" {
|
||||
t.Fatalf("Wrong target address: %s", tgt[model.AddressLabel])
|
||||
}
|
||||
default:
|
||||
t.Fatal("Did not get a target group.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDRemoveApp(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
|
||||
var (
|
||||
ch = make(chan []*config.TargetGroup)
|
||||
client = func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
}
|
||||
md = Discovery{
|
||||
Servers: testServers,
|
||||
Client: client,
|
||||
}
|
||||
)
|
||||
go func() {
|
||||
up1 := (<-ch)[0]
|
||||
up2 := (<-ch)[0]
|
||||
|
@ -151,73 +157,80 @@ func TestMarathonSDRemoveApp(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMarathonSDRunAndStop(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
md.RefreshInterval = time.Millisecond * 10
|
||||
var (
|
||||
ch = make(chan []*config.TargetGroup)
|
||||
client = func(url string) (*AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
}
|
||||
md = Discovery{
|
||||
Servers: testServers,
|
||||
Client: client,
|
||||
RefreshInterval: time.Millisecond * 10,
|
||||
}
|
||||
)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ch:
|
||||
cancel()
|
||||
case <-time.After(md.RefreshInterval * 3):
|
||||
cancel()
|
||||
t.Fatalf("Update took too long.")
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
cancel()
|
||||
case <-time.After(md.RefreshInterval * 3):
|
||||
cancel()
|
||||
t.Fatalf("Update took too long.")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
md.Run(ctx, ch)
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("Channel not closed.")
|
||||
}
|
||||
}
|
||||
|
||||
func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList {
|
||||
task := Task{
|
||||
ID: "test-task-2",
|
||||
Host: "mesos-slave-2",
|
||||
Ports: []uint32{},
|
||||
}
|
||||
docker := DockerContainer{Image: "repo/image:tag"}
|
||||
container := Container{Docker: docker}
|
||||
app := App{
|
||||
ID: "test-service-zero-ports",
|
||||
Tasks: []Task{task},
|
||||
RunningTasks: runningTasks,
|
||||
Labels: labels,
|
||||
Container: container,
|
||||
}
|
||||
var (
|
||||
task = Task{
|
||||
ID: "test-task-2",
|
||||
Host: "mesos-slave-2",
|
||||
Ports: []uint32{},
|
||||
}
|
||||
docker = DockerContainer{Image: "repo/image:tag"}
|
||||
container = Container{Docker: docker}
|
||||
app = App{
|
||||
ID: "test-service-zero-ports",
|
||||
Tasks: []Task{task},
|
||||
RunningTasks: runningTasks,
|
||||
Labels: labels,
|
||||
Container: container,
|
||||
}
|
||||
)
|
||||
return &AppList{
|
||||
Apps: []App{app},
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonZeroTaskPorts(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*AppList, error) {
|
||||
return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case tgs := <-ch:
|
||||
tg := tgs[0]
|
||||
|
||||
if tg.Source != "test-service-zero-ports" {
|
||||
t.Fatalf("Wrong target group name: %s", tg.Source)
|
||||
}
|
||||
if len(tg.Targets) != 0 {
|
||||
t.Fatalf("Wrong number of targets: %v", tg.Targets)
|
||||
}
|
||||
default:
|
||||
t.Fatal("Did not get a target group.")
|
||||
var (
|
||||
ch = make(chan []*config.TargetGroup, 1)
|
||||
client = func(url string) (*AppList, error) {
|
||||
return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(context.Background(), ch)
|
||||
if err != nil {
|
||||
)
|
||||
if err := testUpdateServices(client, ch); err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
select {
|
||||
case tgs := <-ch:
|
||||
tg := tgs[0]
|
||||
|
||||
if tg.Source != "test-service-zero-ports" {
|
||||
t.Fatalf("Wrong target group name: %s", tg.Source)
|
||||
}
|
||||
if len(tg.Targets) != 0 {
|
||||
t.Fatalf("Wrong number of targets: %v", tg.Targets)
|
||||
}
|
||||
default:
|
||||
t.Fatal("Did not get a target group.")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue