From 9f8feb9ff67234b70503ac1c2a62246e931a0798 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 25 Apr 2016 15:30:11 +0200 Subject: [PATCH] discovery: consolidate Marathon SD files --- retrieval/discovery/marathon.go | 79 +------ retrieval/discovery/marathon/client.go | 47 ---- retrieval/discovery/marathon/constants.go | 32 --- retrieval/discovery/marathon/conversion.go | 71 ------ retrieval/discovery/marathon/marathon.go | 222 ++++++++++++++++++ .../discovery/{ => marathon}/marathon_test.go | 45 ++-- retrieval/discovery/marathon/objects.go | 45 ---- retrieval/discovery/marathon/url.go | 29 --- retrieval/targetmanager.go | 2 +- 9 files changed, 252 insertions(+), 320 deletions(-) delete mode 100644 retrieval/discovery/marathon/client.go delete mode 100644 retrieval/discovery/marathon/constants.go delete mode 100644 retrieval/discovery/marathon/conversion.go create mode 100644 retrieval/discovery/marathon/marathon.go rename retrieval/discovery/{ => marathon}/marathon_test.go (75%) delete mode 100644 retrieval/discovery/marathon/objects.go delete mode 100644 retrieval/discovery/marathon/url.go diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index 6361a7873..e6f8edcf7 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -16,81 +16,16 @@ package discovery import ( "time" - "github.com/prometheus/common/log" - "golang.org/x/net/context" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) -// MarathonDiscovery provides service discovery based on a Marathon instance. -type MarathonDiscovery struct { - servers []string - refreshInterval time.Duration - done chan struct{} - lastRefresh map[string]*config.TargetGroup - client marathon.AppListClient -} - -// NewMarathonDiscovery creates a new Marathon based discovery. -func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery { - return &MarathonDiscovery{ - servers: conf.Servers, - refreshInterval: time.Duration(conf.RefreshInterval), - done: make(chan struct{}), - client: marathon.FetchMarathonApps, +// NewMarathon creates a new Marathon based discovery. +func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { + return &marathon.Discovery{ + Servers: conf.Servers, + RefreshInterval: time.Duration(conf.RefreshInterval), + Done: make(chan struct{}), + Client: marathon.FetchApps, } } - -// Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(md.refreshInterval): - err := md.updateServices(ch) - if err != nil { - log.Errorf("Error while updating services: %s", err) - } - } - } -} - -func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error { - targetMap, err := md.fetchTargetGroups() - if err != nil { - return err - } - - all := make([]*config.TargetGroup, 0, len(targetMap)) - for _, tg := range targetMap { - all = append(all, tg) - } - ch <- all - - // Remove services which did disappear - for source := range md.lastRefresh { - _, ok := targetMap[source] - if !ok { - log.Debugf("Removing group for %s", source) - ch <- []*config.TargetGroup{{Source: source}} - } - } - - md.lastRefresh = targetMap - return nil -} - -func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { - url := marathon.RandomAppsURL(md.servers) - apps, err := md.client(url) - if err != nil { - return nil, err - } - - groups := marathon.AppsToTargetGroups(apps) - return groups, nil -} diff --git a/retrieval/discovery/marathon/client.go b/retrieval/discovery/marathon/client.go deleted file mode 100644 index 486b87383..000000000 --- a/retrieval/discovery/marathon/client.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package marathon - -import ( - "encoding/json" - "io/ioutil" - "net/http" -) - -// AppListClient defines a function that can be used to get an application list from marathon. -type AppListClient func(url string) (*AppList, error) - -// FetchMarathonApps requests a list of applications from a marathon server. -func FetchMarathonApps(url string) (*AppList, error) { - resp, err := http.Get(url) - if err != nil { - return nil, err - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - return parseAppJSON(body) -} - -func parseAppJSON(body []byte) (*AppList, error) { - apps := &AppList{} - err := json.Unmarshal(body, apps) - if err != nil { - return nil, err - } - return apps, nil -} diff --git a/retrieval/discovery/marathon/constants.go b/retrieval/discovery/marathon/constants.go deleted file mode 100644 index bffe8939d..000000000 --- a/retrieval/discovery/marathon/constants.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package marathon - -import ( - "github.com/prometheus/common/model" -) - -const ( - // metaLabelPrefix is the meta prefix used for all meta labels in this discovery. - metaLabelPrefix = model.MetaLabelPrefix + "marathon_" - // appLabelPrefix is the prefix for the application labels. - appLabelPrefix = metaLabelPrefix + "app_label_" - - // appLabel is used for the name of the app in Marathon. - appLabel model.LabelName = metaLabelPrefix + "app" - // imageLabel is the label that is used for the docker image running the service. - imageLabel model.LabelName = metaLabelPrefix + "image" - // taskLabel contains the mesos task name of the app instance. - taskLabel model.LabelName = metaLabelPrefix + "task" -) diff --git a/retrieval/discovery/marathon/conversion.go b/retrieval/discovery/marathon/conversion.go deleted file mode 100644 index 7c60e32a1..000000000 --- a/retrieval/discovery/marathon/conversion.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package marathon - -import ( - "fmt" - - "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/config" -) - -// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups. -func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup { - tgroups := map[string]*config.TargetGroup{} - for _, a := range apps.Apps { - group := createTargetGroup(&a) - tgroups[group.Source] = group - } - return tgroups -} - -func createTargetGroup(app *App) *config.TargetGroup { - var ( - targets = targetsForApp(app) - appName = model.LabelValue(app.ID) - image = model.LabelValue(app.Container.Docker.Image) - ) - tg := &config.TargetGroup{ - Targets: targets, - Labels: model.LabelSet{ - appLabel: appName, - imageLabel: image, - }, - Source: app.ID, - } - - for ln, lv := range app.Labels { - ln = appLabelPrefix + ln - tg.Labels[model.LabelName(ln)] = model.LabelValue(lv) - } - - return tg -} - -func targetsForApp(app *App) []model.LabelSet { - targets := make([]model.LabelSet, 0, len(app.Tasks)) - for _, t := range app.Tasks { - target := targetForTask(&t) - targets = append(targets, model.LabelSet{ - model.AddressLabel: model.LabelValue(target), - taskLabel: model.LabelValue(t.ID), - }) - } - return targets -} - -func targetForTask(task *Task) string { - return fmt.Sprintf("%s:%d", task.Host, task.Ports[0]) -} diff --git a/retrieval/discovery/marathon/marathon.go b/retrieval/discovery/marathon/marathon.go new file mode 100644 index 000000000..905e8ac5f --- /dev/null +++ b/retrieval/discovery/marathon/marathon.go @@ -0,0 +1,222 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package marathon + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "time" + + "golang.org/x/net/context" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +const ( + // metaLabelPrefix is the meta prefix used for all meta labels in this discovery. + metaLabelPrefix = model.MetaLabelPrefix + "marathon_" + // appLabelPrefix is the prefix for the application labels. + appLabelPrefix = metaLabelPrefix + "app_label_" + + // appLabel is used for the name of the app in Marathon. + appLabel model.LabelName = metaLabelPrefix + "app" + // imageLabel is the label that is used for the docker image running the service. + imageLabel model.LabelName = metaLabelPrefix + "image" + // taskLabel contains the mesos task name of the app instance. + taskLabel model.LabelName = metaLabelPrefix + "task" +) + +const appListPath string = "/v2/apps/?embed=apps.tasks" + +// Discovery provides service discovery based on a Marathon instance. +type Discovery struct { + Servers []string + RefreshInterval time.Duration + Done chan struct{} + lastRefresh map[string]*config.TargetGroup + Client AppListClient +} + +// Run implements the TargetProvider interface. +func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer close(ch) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(md.RefreshInterval): + err := md.updateServices(ch) + if err != nil { + log.Errorf("Error while updating services: %s", err) + } + } + } +} + +func (md *Discovery) updateServices(ch chan<- []*config.TargetGroup) error { + targetMap, err := md.fetchTargetGroups() + if err != nil { + return err + } + + all := make([]*config.TargetGroup, 0, len(targetMap)) + for _, tg := range targetMap { + all = append(all, tg) + } + ch <- all + + // Remove services which did disappear + for source := range md.lastRefresh { + _, ok := targetMap[source] + if !ok { + log.Debugf("Removing group for %s", source) + ch <- []*config.TargetGroup{{Source: source}} + } + } + + md.lastRefresh = targetMap + return nil +} + +func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { + url := RandomAppsURL(md.Servers) + apps, err := md.Client(url) + if err != nil { + return nil, err + } + + groups := AppsToTargetGroups(apps) + return groups, nil +} + +// Task describes one instance of a service running on Marathon. +type Task struct { + ID string `json:"id"` + Host string `json:"host"` + Ports []uint32 `json:"ports"` +} + +// DockerContainer describes a container which uses the docker runtime. +type DockerContainer struct { + Image string `json:"image"` +} + +// Container describes the runtime an app in running in. +type Container struct { + Docker DockerContainer `json:"docker"` +} + +// App describes a service running on Marathon. +type App struct { + ID string `json:"id"` + Tasks []Task `json:"tasks"` + RunningTasks int `json:"tasksRunning"` + Labels map[string]string `json:"labels"` + Container Container `json:"container"` +} + +// AppList is a list of Marathon apps. +type AppList struct { + Apps []App `json:"apps"` +} + +// AppListClient defines a function that can be used to get an application list from marathon. +type AppListClient func(url string) (*AppList, error) + +// FetchApps requests a list of applications from a marathon server. +func FetchApps(url string) (*AppList, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return parseAppJSON(body) +} + +func parseAppJSON(body []byte) (*AppList, error) { + apps := &AppList{} + err := json.Unmarshal(body, apps) + if err != nil { + return nil, err + } + return apps, nil +} + +// RandomAppsURL randomly selects a server from an array and creates +// an URL pointing to the app list. +func RandomAppsURL(servers []string) string { + // TODO: If possible update server list from Marathon at some point. + server := servers[rand.Intn(len(servers))] + return fmt.Sprintf("%s%s", server, appListPath) +} + +// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups. +func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup { + tgroups := map[string]*config.TargetGroup{} + for _, a := range apps.Apps { + group := createTargetGroup(&a) + tgroups[group.Source] = group + } + return tgroups +} + +func createTargetGroup(app *App) *config.TargetGroup { + var ( + targets = targetsForApp(app) + appName = model.LabelValue(app.ID) + image = model.LabelValue(app.Container.Docker.Image) + ) + tg := &config.TargetGroup{ + Targets: targets, + Labels: model.LabelSet{ + appLabel: appName, + imageLabel: image, + }, + Source: app.ID, + } + + for ln, lv := range app.Labels { + ln = appLabelPrefix + ln + tg.Labels[model.LabelName(ln)] = model.LabelValue(lv) + } + + return tg +} + +func targetsForApp(app *App) []model.LabelSet { + targets := make([]model.LabelSet, 0, len(app.Tasks)) + for _, t := range app.Tasks { + target := targetForTask(&t) + targets = append(targets, model.LabelSet{ + model.AddressLabel: model.LabelValue(target), + taskLabel: model.LabelValue(t.ID), + }) + } + return targets +} + +func targetForTask(task *Task) string { + return fmt.Sprintf("%s:%d", task.Host, task.Ports[0]) +} diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon/marathon_test.go similarity index 75% rename from retrieval/discovery/marathon_test.go rename to retrieval/discovery/marathon/marathon_test.go index cc58a1380..98a6c5e89 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon/marathon_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discovery +package marathon import ( "errors" @@ -22,23 +22,22 @@ import ( "golang.org/x/net/context" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) var marathonValidLabel = map[string]string{"prometheus": "yes"} -func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) { +func newTestDiscovery(client AppListClient) (chan []*config.TargetGroup, *Discovery) { ch := make(chan []*config.TargetGroup) - md := NewMarathonDiscovery(&config.MarathonSDConfig{ + md := &Discovery{ Servers: []string{"http://localhost:8080"}, - }) - md.client = client + } + md.Client = client return ch, md } func TestMarathonSDHandleError(t *testing.T) { var errTesting = errors.New("testing failure") - ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + ch, md := newTestDiscovery(func(url string) (*AppList, error) { return nil, errTesting }) go func() { @@ -55,8 +54,8 @@ func TestMarathonSDHandleError(t *testing.T) { } func TestMarathonSDEmptyList(t *testing.T) { - ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { - return &marathon.AppList{}, nil + ch, md := newTestDiscovery(func(url string) (*AppList, error) { + return &AppList{}, nil }) go func() { select { @@ -73,28 +72,28 @@ func TestMarathonSDEmptyList(t *testing.T) { } } -func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList { - task := marathon.Task{ +func marathonTestAppList(labels map[string]string, runningTasks int) *AppList { + task := Task{ ID: "test-task-1", Host: "mesos-slave1", Ports: []uint32{31000}, } - docker := marathon.DockerContainer{Image: "repo/image:tag"} - container := marathon.Container{Docker: docker} - app := marathon.App{ + docker := DockerContainer{Image: "repo/image:tag"} + container := Container{Docker: docker} + app := App{ ID: "test-service", - Tasks: []marathon.Task{task}, + Tasks: []Task{task}, RunningTasks: runningTasks, Labels: labels, Container: container, } - return &marathon.AppList{ - Apps: []marathon.App{app}, + return &AppList{ + Apps: []App{app}, } } func TestMarathonSDSendGroup(t *testing.T) { - ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + ch, md := newTestDiscovery(func(url string) (*AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) go func() { @@ -123,7 +122,7 @@ func TestMarathonSDSendGroup(t *testing.T) { } func TestMarathonSDRemoveApp(t *testing.T) { - ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + ch, md := newTestDiscovery(func(url string) (*AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) @@ -142,7 +141,7 @@ func TestMarathonSDRemoveApp(t *testing.T) { t.Fatalf("Got error on first update: %s", err) } - md.client = func(url string) (*marathon.AppList, error) { + md.Client = func(url string) (*AppList, error) { return marathonTestAppList(marathonValidLabel, 0), nil } err = md.updateServices(ch) @@ -152,17 +151,17 @@ func TestMarathonSDRemoveApp(t *testing.T) { } func TestMarathonSDRunAndStop(t *testing.T) { - ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + ch, md := newTestDiscovery(func(url string) (*AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) - md.refreshInterval = time.Millisecond * 10 + md.RefreshInterval = time.Millisecond * 10 ctx, cancel := context.WithCancel(context.Background()) go func() { select { case <-ch: cancel() - case <-time.After(md.refreshInterval * 3): + case <-time.After(md.RefreshInterval * 3): cancel() t.Fatalf("Update took too long.") } diff --git a/retrieval/discovery/marathon/objects.go b/retrieval/discovery/marathon/objects.go deleted file mode 100644 index 22cf42630..000000000 --- a/retrieval/discovery/marathon/objects.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package marathon - -// Task describes one instance of a service running on Marathon. -type Task struct { - ID string `json:"id"` - Host string `json:"host"` - Ports []uint32 `json:"ports"` -} - -// DockerContainer describes a container which uses the docker runtime. -type DockerContainer struct { - Image string `json:"image"` -} - -// Container describes the runtime an app in running in. -type Container struct { - Docker DockerContainer `json:"docker"` -} - -// App describes a service running on Marathon. -type App struct { - ID string `json:"id"` - Tasks []Task `json:"tasks"` - RunningTasks int `json:"tasksRunning"` - Labels map[string]string `json:"labels"` - Container Container `json:"container"` -} - -// AppList is a list of Marathon apps. -type AppList struct { - Apps []App `json:"apps"` -} diff --git a/retrieval/discovery/marathon/url.go b/retrieval/discovery/marathon/url.go deleted file mode 100644 index b988ab52b..000000000 --- a/retrieval/discovery/marathon/url.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package marathon - -import ( - "fmt" - "math/rand" -) - -const appListPath string = "/v2/apps/?embed=apps.tasks" - -// RandomAppsURL randomly selects a server from an array and creates -// an URL pointing to the app list. -func RandomAppsURL(servers []string) string { - // TODO: If possible update server list from Marathon at some point. - server := servers[rand.Intn(len(servers))] - return fmt.Sprintf("%s%s", server, appListPath) -} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 77443e31e..68051d95f 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -374,7 +374,7 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { app("consul", i, k) } for i, c := range cfg.MarathonSDConfigs { - app("marathon", i, discovery.NewMarathonDiscovery(c)) + app("marathon", i, discovery.NewMarathon(c)) } for i, c := range cfg.KubernetesSDConfigs { k, err := discovery.NewKubernetesDiscovery(c)