mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-27 13:42:31 -08:00
Merge pull request #902 from xperimental/feature/marathon-discovery
retrieval/discovery: Service discovery using marathon API
This commit is contained in:
commit
54202bc5a8
3
.dockerignore
Normal file
3
.dockerignore
Normal file
|
@ -0,0 +1,3 @@
|
|||
data/
|
||||
prometheus
|
||||
promtool
|
|
@ -98,6 +98,11 @@ var (
|
|||
DefaultServersetSDConfig = ServersetSDConfig{
|
||||
Timeout: Duration(10 * time.Second),
|
||||
}
|
||||
|
||||
// DefaultMarathonSDConfig is the default Marathon SD configuration.
|
||||
DefaultMarathonSDConfig = MarathonSDConfig{
|
||||
RefreshInterval: Duration(30 * time.Second),
|
||||
}
|
||||
)
|
||||
|
||||
// This custom URL type allows validating at configuration load time.
|
||||
|
@ -278,6 +283,8 @@ type ScrapeConfig struct {
|
|||
ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"`
|
||||
// List of Serverset service discovery configurations.
|
||||
ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"`
|
||||
// MarathonSDConfigs is a list of Marathon service discovery configurations.
|
||||
MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"`
|
||||
|
||||
// List of target relabel configurations.
|
||||
RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"`
|
||||
|
@ -538,6 +545,29 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
|
|||
return checkOverflow(c.XXX, "serverset_sd_config")
|
||||
}
|
||||
|
||||
// MarathonSDConfig is the configuration for services running on Marathon.
|
||||
type MarathonSDConfig struct {
|
||||
Servers []string `yaml:"servers,omitempty"`
|
||||
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
*c = DefaultMarathonSDConfig
|
||||
type plain MarathonSDConfig
|
||||
err := unmarshal((*plain)(c))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(c.Servers) == 0 {
|
||||
return fmt.Errorf("Marathon SD config must contain at least one Marathon server")
|
||||
}
|
||||
return checkOverflow(c.XXX, "marathon_sd_config")
|
||||
}
|
||||
|
||||
// RelabelAction is the action to be performed on relabeling.
|
||||
type RelabelAction string
|
||||
|
||||
|
@ -574,7 +604,7 @@ type RelabelConfig struct {
|
|||
// Separator is the string between concatenated values from the source labels.
|
||||
Separator string `yaml:"separator,omitempty"`
|
||||
// Regex against which the concatenation is matched.
|
||||
Regex *Regexp `yaml:"regex",omitempty`
|
||||
Regex *Regexp `yaml:"regex,omitempty"`
|
||||
// Modulus to take of the hash of concatenated values from the source labels.
|
||||
Modulus uint64 `yaml:"modulus,omitempty"`
|
||||
// The label to which the resulting string is written in a replacement.
|
||||
|
|
98
retrieval/discovery/marathon.go
Normal file
98
retrieval/discovery/marathon.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
|
||||
)
|
||||
|
||||
// MarathonDiscovery provides service discovery based on a Marathon instance.
|
||||
type MarathonDiscovery struct {
|
||||
servers []string
|
||||
refreshInterval time.Duration
|
||||
done chan struct{}
|
||||
lastRefresh map[string]*config.TargetGroup
|
||||
client marathon.AppListClient
|
||||
}
|
||||
|
||||
// NewMarathonDiscovery creates a new Marathon based discovery.
|
||||
func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
|
||||
return &MarathonDiscovery{
|
||||
servers: conf.Servers,
|
||||
refreshInterval: time.Duration(conf.RefreshInterval),
|
||||
done: make(chan struct{}),
|
||||
client: marathon.FetchMarathonApps,
|
||||
}
|
||||
}
|
||||
|
||||
// Sources implements the TargetProvider interface.
|
||||
func (md *MarathonDiscovery) Sources() []string {
|
||||
var sources []string
|
||||
tgroups, err := md.fetchTargetGroups()
|
||||
if err == nil {
|
||||
for source := range tgroups {
|
||||
sources = append(sources, source)
|
||||
}
|
||||
}
|
||||
return sources
|
||||
}
|
||||
|
||||
// Run implements the TargetProvider interface.
|
||||
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) {
|
||||
defer close(ch)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-md.done:
|
||||
log.Debug("Shutting down marathon discovery.")
|
||||
return
|
||||
case <-time.After(md.refreshInterval):
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
log.Errorf("Error while updating services: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop implements the TargetProvider interface.
|
||||
func (md *MarathonDiscovery) Stop() {
|
||||
md.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
|
||||
targetMap, err := md.fetchTargetGroups()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update services which are still present
|
||||
for _, tg := range targetMap {
|
||||
ch <- tg
|
||||
}
|
||||
|
||||
// Remove services which did disappear
|
||||
for source := range md.lastRefresh {
|
||||
_, ok := targetMap[source]
|
||||
if !ok {
|
||||
log.Debugf("Removing group for %s", source)
|
||||
ch <- &config.TargetGroup{Source: source}
|
||||
}
|
||||
}
|
||||
|
||||
md.lastRefresh = targetMap
|
||||
return nil
|
||||
}
|
||||
|
||||
func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
|
||||
url := marathon.RandomAppsURL(md.servers)
|
||||
apps, err := md.client(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groups := marathon.AppsToTargetGroups(apps)
|
||||
return groups, nil
|
||||
}
|
34
retrieval/discovery/marathon/client.go
Normal file
34
retrieval/discovery/marathon/client.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
package marathon
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// AppListClient defines a function that can be used to get an application list from marathon.
|
||||
type AppListClient func(url string) (*AppList, error)
|
||||
|
||||
// FetchMarathonApps requests a list of applications from a marathon server.
|
||||
func FetchMarathonApps(url string) (*AppList, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return parseAppJSON(body)
|
||||
}
|
||||
|
||||
func parseAppJSON(body []byte) (*AppList, error) {
|
||||
apps := &AppList{}
|
||||
err := json.Unmarshal(body, apps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return apps, nil
|
||||
}
|
16
retrieval/discovery/marathon/constants.go
Normal file
16
retrieval/discovery/marathon/constants.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package marathon
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
const (
|
||||
// AppLabel is used for the name of the app in Marathon.
|
||||
AppLabel model.LabelName = "__meta_marathon_app"
|
||||
|
||||
// ImageLabel is the label that is used for the docker image running the service.
|
||||
ImageLabel model.LabelName = "__meta_marathon_image"
|
||||
|
||||
// TaskLabel contains the mesos task name of the app instance.
|
||||
TaskLabel model.LabelName = "__meta_marathon_task"
|
||||
)
|
76
retrieval/discovery/marathon/conversion.go
Normal file
76
retrieval/discovery/marathon/conversion.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package marathon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups.
|
||||
func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup {
|
||||
tgroups := map[string]*config.TargetGroup{}
|
||||
for _, a := range apps.Apps {
|
||||
if isValidApp(&a) {
|
||||
group := createTargetGroup(&a)
|
||||
tgroups[group.Source] = group
|
||||
}
|
||||
}
|
||||
return tgroups
|
||||
}
|
||||
|
||||
func createTargetGroup(app *App) *config.TargetGroup {
|
||||
var (
|
||||
targets = targetsForApp(app)
|
||||
source = targetGroupName(app)
|
||||
appName = clientmodel.LabelValue(sanitizeName(app.ID))
|
||||
image = clientmodel.LabelValue(imageName(app))
|
||||
)
|
||||
return &config.TargetGroup{
|
||||
Targets: targets,
|
||||
Labels: clientmodel.LabelSet{
|
||||
AppLabel: appName,
|
||||
ImageLabel: image,
|
||||
},
|
||||
Source: source,
|
||||
}
|
||||
}
|
||||
|
||||
func targetsForApp(app *App) []clientmodel.LabelSet {
|
||||
targets := make([]clientmodel.LabelSet, 0, len(app.Tasks))
|
||||
for _, t := range app.Tasks {
|
||||
target := targetForTask(&t)
|
||||
targets = append(targets, clientmodel.LabelSet{
|
||||
clientmodel.AddressLabel: clientmodel.LabelValue(target),
|
||||
TaskLabel: clientmodel.LabelValue(sanitizeName(t.ID)),
|
||||
})
|
||||
}
|
||||
return targets
|
||||
}
|
||||
|
||||
func imageName(app *App) string {
|
||||
return app.Container.Docker.Image
|
||||
}
|
||||
|
||||
func targetForTask(task *Task) string {
|
||||
return fmt.Sprintf("%s:%d", task.Host, task.Ports[0])
|
||||
}
|
||||
|
||||
func isValidApp(app *App) bool {
|
||||
if app.RunningTasks > 0 {
|
||||
_, ok := app.Labels["prometheus"]
|
||||
return ok
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func targetGroupName(app *App) string {
|
||||
return fmt.Sprintf("marathon:%s", sanitizeName(app.ID))
|
||||
}
|
||||
|
||||
func sanitizeName(id string) string {
|
||||
trimID := strings.TrimLeft(id, " -/.")
|
||||
return strings.Replace(trimID, "/", "-", -1)
|
||||
}
|
32
retrieval/discovery/marathon/objects.go
Normal file
32
retrieval/discovery/marathon/objects.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package marathon
|
||||
|
||||
// Task describes one instance of a service running on Marathon.
|
||||
type Task struct {
|
||||
ID string `json:"id"`
|
||||
Host string `json:"host"`
|
||||
Ports []uint32 `json:"ports"`
|
||||
}
|
||||
|
||||
// DockerContainer describes a container which uses the docker runtime.
|
||||
type DockerContainer struct {
|
||||
Image string `json:"image"`
|
||||
}
|
||||
|
||||
// Container describes the runtime an app in running in.
|
||||
type Container struct {
|
||||
Docker DockerContainer `json:"docker"`
|
||||
}
|
||||
|
||||
// App describes a service running on Marathon.
|
||||
type App struct {
|
||||
ID string `json:"id"`
|
||||
Tasks []Task `json:"tasks"`
|
||||
RunningTasks int `json:"tasksRunning"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
Container Container `json:"container"`
|
||||
}
|
||||
|
||||
// AppList is a list of Marathon apps.
|
||||
type AppList struct {
|
||||
Apps []App `json:"apps"`
|
||||
}
|
15
retrieval/discovery/marathon/url.go
Normal file
15
retrieval/discovery/marathon/url.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package marathon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
)
|
||||
|
||||
const appListPath string = "/v2/apps/?embed=apps.tasks"
|
||||
|
||||
// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list.
|
||||
func RandomAppsURL(servers []string) string {
|
||||
// TODO If possible update server list from Marathon at some point
|
||||
server := servers[rand.Intn(len(servers))]
|
||||
return fmt.Sprintf("%s%s", server, appListPath)
|
||||
}
|
200
retrieval/discovery/marathon_test.go
Normal file
200
retrieval/discovery/marathon_test.go
Normal file
|
@ -0,0 +1,200 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
|
||||
)
|
||||
|
||||
var marathonValidLabel = map[string]string{"prometheus": "yes"}
|
||||
|
||||
func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) {
|
||||
ch := make(chan *config.TargetGroup)
|
||||
md := NewMarathonDiscovery(&config.MarathonSDConfig{
|
||||
Servers: []string{"http://localhost:8080"},
|
||||
})
|
||||
md.client = client
|
||||
return ch, md
|
||||
}
|
||||
|
||||
func TestMarathonSDHandleError(t *testing.T) {
|
||||
var errTesting = errors.New("testing failure")
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return nil, errTesting
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %s", tg)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != errTesting {
|
||||
t.Fatalf("Expected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDEmptyList(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return &marathon.AppList{}, nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %v", tg)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList {
|
||||
task := marathon.Task{
|
||||
ID: "test-task-1",
|
||||
Host: "mesos-slave1",
|
||||
Ports: []uint32{31000},
|
||||
}
|
||||
docker := marathon.DockerContainer{Image: "repo/image:tag"}
|
||||
container := marathon.Container{Docker: docker}
|
||||
app := marathon.App{
|
||||
ID: "test-service",
|
||||
Tasks: []marathon.Task{task},
|
||||
RunningTasks: runningTasks,
|
||||
Labels: labels,
|
||||
Container: container,
|
||||
}
|
||||
return &marathon.AppList{
|
||||
Apps: []marathon.App{app},
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDSendGroup(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
if tg.Source != "marathon:test-service" {
|
||||
t.Fatalf("Wrong target group name: %s", tg.Source)
|
||||
}
|
||||
if len(tg.Targets) != 1 {
|
||||
t.Fatalf("Wrong number of targets: %v", tg.Targets)
|
||||
}
|
||||
tgt := tg.Targets[0]
|
||||
if tgt[clientmodel.AddressLabel] != "mesos-slave1:31000" {
|
||||
t.Fatalf("Wrong target address: %s", tgt[clientmodel.AddressLabel])
|
||||
}
|
||||
default:
|
||||
t.Fatal("Did not get a target group.")
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDNoLabel(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(map[string]string{}, 1), nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %s", tg)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDNotRunning(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 0), nil
|
||||
})
|
||||
go func() {
|
||||
select {
|
||||
case tg := <-ch:
|
||||
t.Fatalf("Got group: %s", tg)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDRemoveApp(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
go func() {
|
||||
up1 := <-ch
|
||||
up2 := <-ch
|
||||
if up2.Source != up1.Source {
|
||||
t.Fatalf("Source is different: %s", up2)
|
||||
if len(up2.Targets) > 0 {
|
||||
t.Fatalf("Got a non-empty target set: %s", up2.Targets)
|
||||
}
|
||||
}
|
||||
}()
|
||||
err := md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error on first update: %s", err)
|
||||
}
|
||||
|
||||
md.client = func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 0), nil
|
||||
}
|
||||
err = md.updateServices(ch)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error on second update: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDSources(t *testing.T) {
|
||||
_, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
sources := md.Sources()
|
||||
if len(sources) != 1 {
|
||||
t.Fatalf("Wrong number of sources: %s", sources)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarathonSDRunAndStop(t *testing.T) {
|
||||
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
|
||||
return marathonTestAppList(marathonValidLabel, 1), nil
|
||||
})
|
||||
md.refreshInterval = time.Millisecond * 10
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ch:
|
||||
md.Stop()
|
||||
case <-time.After(md.refreshInterval * 3):
|
||||
md.Stop()
|
||||
t.Fatalf("Update took too long.")
|
||||
}
|
||||
}()
|
||||
md.Run(ch)
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("Channel not closed.")
|
||||
}
|
||||
}
|
|
@ -398,6 +398,9 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
|
|||
for _, c := range cfg.ServersetSDConfigs {
|
||||
providers = append(providers, discovery.NewServersetDiscovery(c))
|
||||
}
|
||||
for _, c := range cfg.MarathonSDConfigs {
|
||||
providers = append(providers, discovery.NewMarathonDiscovery(c))
|
||||
}
|
||||
if len(cfg.TargetGroups) > 0 {
|
||||
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue